@@ -136,6 +136,10 @@ type circuitBreakerBase struct {
136
136
retryTimeoutMs uint32
137
137
// nextRetryTimestampMs is the time circuit breaker could probe
138
138
nextRetryTimestampMs uint64
139
+ // probeNumber is the number of probe requests that are allowed to pass when the circuit breaker is half open.
140
+ probeNumber uint64
141
+ // curProbeNumber is the real-time probe number.
142
+ curProbeNumber uint64
139
143
// state is the state machine of circuit breaker
140
144
state * State
141
145
}
@@ -156,6 +160,14 @@ func (b *circuitBreakerBase) updateNextRetryTimestamp() {
156
160
atomic .StoreUint64 (& b .nextRetryTimestampMs , util .CurrentTimeMillis ()+ uint64 (b .retryTimeoutMs ))
157
161
}
158
162
163
+ func (b * circuitBreakerBase ) addCurProbeNum () {
164
+ atomic .AddUint64 (& b .curProbeNumber , 1 )
165
+ }
166
+
167
+ func (b * circuitBreakerBase ) resetCurProbeNum () {
168
+ atomic .StoreUint64 (& b .curProbeNumber , 0 )
169
+ }
170
+
159
171
// fromClosedToOpen updates circuit breaker state machine from closed to open.
160
172
// Return true only if current goroutine successfully accomplished the transformation.
161
173
func (b * circuitBreakerBase ) fromClosedToOpen (snapshot interface {}) bool {
@@ -206,6 +218,7 @@ func (b *circuitBreakerBase) fromOpenToHalfOpen(ctx *base.EntryContext) bool {
206
218
// Return true only if current goroutine successfully accomplished the transformation.
207
219
func (b * circuitBreakerBase ) fromHalfOpenToOpen (snapshot interface {}) bool {
208
220
if b .state .cas (HalfOpen , Open ) {
221
+ b .resetCurProbeNum ()
209
222
b .updateNextRetryTimestamp ()
210
223
for _ , listener := range stateChangeListeners {
211
224
listener .OnTransformToOpen (HalfOpen , * b .rule , snapshot )
@@ -221,6 +234,7 @@ func (b *circuitBreakerBase) fromHalfOpenToOpen(snapshot interface{}) bool {
221
234
// Return true only if current goroutine successfully accomplished the transformation.
222
235
func (b * circuitBreakerBase ) fromHalfOpenToClosed () bool {
223
236
if b .state .cas (HalfOpen , Closed ) {
237
+ b .resetCurProbeNum ()
224
238
for _ , listener := range stateChangeListeners {
225
239
listener .OnTransformToClosed (HalfOpen , * b .rule )
226
240
}
@@ -247,6 +261,7 @@ func newSlowRtCircuitBreakerWithStat(r *Rule, stat *slowRequestLeapArray) *slowR
247
261
retryTimeoutMs : r .RetryTimeoutMs ,
248
262
nextRetryTimestampMs : 0 ,
249
263
state : newState (),
264
+ probeNumber : r .ProbeNum ,
250
265
},
251
266
stat : stat ,
252
267
maxAllowedRt : r .MaxAllowedRtMs ,
@@ -282,6 +297,8 @@ func (b *slowRtCircuitBreaker) TryPass(ctx *base.EntryContext) bool {
282
297
if b .retryTimeoutArrived () && b .fromOpenToHalfOpen (ctx ) {
283
298
return true
284
299
}
300
+ } else if curStatus == HalfOpen && b .probeNumber > 0 {
301
+ return true
285
302
}
286
303
return false
287
304
}
@@ -318,9 +335,12 @@ func (b *slowRtCircuitBreaker) OnRequestComplete(rt uint64, _ error) {
318
335
// fail to probe
319
336
b .fromHalfOpenToOpen (1.0 )
320
337
} else {
321
- // succeed to probe
322
- b .fromHalfOpenToClosed ()
323
- b .resetMetric ()
338
+ b .addCurProbeNum ()
339
+ if b .probeNumber == 0 || atomic .LoadUint64 (& b .curProbeNumber ) == b .probeNumber {
340
+ // succeed to probe
341
+ b .fromHalfOpenToClosed ()
342
+ b .resetMetric ()
343
+ }
324
344
}
325
345
return
326
346
}
@@ -433,6 +453,7 @@ func newErrorRatioCircuitBreakerWithStat(r *Rule, stat *errorCounterLeapArray) *
433
453
retryTimeoutMs : r .RetryTimeoutMs ,
434
454
nextRetryTimestampMs : 0 ,
435
455
state : newState (),
456
+ probeNumber : r .ProbeNum ,
436
457
},
437
458
minRequestAmount : r .MinRequestAmount ,
438
459
errorRatioThreshold : r .Threshold ,
@@ -465,6 +486,8 @@ func (b *errorRatioCircuitBreaker) TryPass(ctx *base.EntryContext) bool {
465
486
if b .retryTimeoutArrived () && b .fromOpenToHalfOpen (ctx ) {
466
487
return true
467
488
}
489
+ } else if curStatus == HalfOpen && b .probeNumber > 0 {
490
+ return true
468
491
}
469
492
return false
470
493
}
@@ -498,8 +521,11 @@ func (b *errorRatioCircuitBreaker) OnRequestComplete(_ uint64, err error) {
498
521
}
499
522
if curStatus == HalfOpen {
500
523
if err == nil {
501
- b .fromHalfOpenToClosed ()
502
- b .resetMetric ()
524
+ b .addCurProbeNum ()
525
+ if b .probeNumber == 0 || atomic .LoadUint64 (& b .curProbeNumber ) == b .probeNumber {
526
+ b .fromHalfOpenToClosed ()
527
+ b .resetMetric ()
528
+ }
503
529
} else {
504
530
b .fromHalfOpenToOpen (1.0 )
505
531
}
@@ -612,6 +638,7 @@ func newErrorCountCircuitBreakerWithStat(r *Rule, stat *errorCounterLeapArray) *
612
638
retryTimeoutMs : r .RetryTimeoutMs ,
613
639
nextRetryTimestampMs : 0 ,
614
640
state : newState (),
641
+ probeNumber : r .ProbeNum ,
615
642
},
616
643
minRequestAmount : r .MinRequestAmount ,
617
644
errorCountThreshold : uint64 (r .Threshold ),
@@ -644,6 +671,8 @@ func (b *errorCountCircuitBreaker) TryPass(ctx *base.EntryContext) bool {
644
671
if b .retryTimeoutArrived () && b .fromOpenToHalfOpen (ctx ) {
645
672
return true
646
673
}
674
+ } else if curStatus == HalfOpen && b .probeNumber > 0 {
675
+ return true
647
676
}
648
677
return false
649
678
}
@@ -675,8 +704,11 @@ func (b *errorCountCircuitBreaker) OnRequestComplete(_ uint64, err error) {
675
704
}
676
705
if curStatus == HalfOpen {
677
706
if err == nil {
678
- b .fromHalfOpenToClosed ()
679
- b .resetMetric ()
707
+ b .addCurProbeNum ()
708
+ if b .probeNumber == 0 || atomic .LoadUint64 (& b .curProbeNumber ) == b .probeNumber {
709
+ b .fromHalfOpenToClosed ()
710
+ b .resetMetric ()
711
+ }
680
712
} else {
681
713
b .fromHalfOpenToOpen (1 )
682
714
}
0 commit comments