Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

circuit breaker supported probe number #428

Merged
merged 1 commit into from
Dec 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 39 additions & 7 deletions core/circuitbreaker/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ type circuitBreakerBase struct {
retryTimeoutMs uint32
// nextRetryTimestampMs is the time circuit breaker could probe
nextRetryTimestampMs uint64
// probeNumber is the number of probe requests that are allowed to pass when the circuit breaker is half open.
probeNumber uint64
// curProbeNumber is the real-time probe number.
curProbeNumber uint64
// state is the state machine of circuit breaker
state *State
}
Expand All @@ -156,6 +160,14 @@ func (b *circuitBreakerBase) updateNextRetryTimestamp() {
atomic.StoreUint64(&b.nextRetryTimestampMs, util.CurrentTimeMillis()+uint64(b.retryTimeoutMs))
}

func (b *circuitBreakerBase) addCurProbeNum() {
atomic.AddUint64(&b.curProbeNumber, 1)
}

func (b *circuitBreakerBase) resetCurProbeNum() {
atomic.StoreUint64(&b.curProbeNumber, 0)
}

// fromClosedToOpen updates circuit breaker state machine from closed to open.
// Return true only if current goroutine successfully accomplished the transformation.
func (b *circuitBreakerBase) fromClosedToOpen(snapshot interface{}) bool {
Expand Down Expand Up @@ -206,6 +218,7 @@ func (b *circuitBreakerBase) fromOpenToHalfOpen(ctx *base.EntryContext) bool {
// Return true only if current goroutine successfully accomplished the transformation.
func (b *circuitBreakerBase) fromHalfOpenToOpen(snapshot interface{}) bool {
if b.state.cas(HalfOpen, Open) {
b.resetCurProbeNum()
b.updateNextRetryTimestamp()
for _, listener := range stateChangeListeners {
listener.OnTransformToOpen(HalfOpen, *b.rule, snapshot)
Expand All @@ -221,6 +234,7 @@ func (b *circuitBreakerBase) fromHalfOpenToOpen(snapshot interface{}) bool {
// Return true only if current goroutine successfully accomplished the transformation.
func (b *circuitBreakerBase) fromHalfOpenToClosed() bool {
if b.state.cas(HalfOpen, Closed) {
b.resetCurProbeNum()
for _, listener := range stateChangeListeners {
listener.OnTransformToClosed(HalfOpen, *b.rule)
}
Expand All @@ -247,6 +261,7 @@ func newSlowRtCircuitBreakerWithStat(r *Rule, stat *slowRequestLeapArray) *slowR
retryTimeoutMs: r.RetryTimeoutMs,
nextRetryTimestampMs: 0,
state: newState(),
probeNumber: r.ProbeNum,
},
stat: stat,
maxAllowedRt: r.MaxAllowedRtMs,
Expand Down Expand Up @@ -282,6 +297,8 @@ func (b *slowRtCircuitBreaker) TryPass(ctx *base.EntryContext) bool {
if b.retryTimeoutArrived() && b.fromOpenToHalfOpen(ctx) {
return true
}
} else if curStatus == HalfOpen && b.probeNumber > 0 {
return true
}
return false
}
Expand Down Expand Up @@ -318,9 +335,12 @@ func (b *slowRtCircuitBreaker) OnRequestComplete(rt uint64, _ error) {
// fail to probe
b.fromHalfOpenToOpen(1.0)
} else {
// succeed to probe
b.fromHalfOpenToClosed()
b.resetMetric()
b.addCurProbeNum()
if b.probeNumber == 0 || atomic.LoadUint64(&b.curProbeNumber) >= b.probeNumber {
// succeed to probe
b.fromHalfOpenToClosed()
b.resetMetric()
}
}
return
}
Expand Down Expand Up @@ -433,6 +453,7 @@ func newErrorRatioCircuitBreakerWithStat(r *Rule, stat *errorCounterLeapArray) *
retryTimeoutMs: r.RetryTimeoutMs,
nextRetryTimestampMs: 0,
state: newState(),
probeNumber: r.ProbeNum,
},
minRequestAmount: r.MinRequestAmount,
errorRatioThreshold: r.Threshold,
Expand Down Expand Up @@ -465,6 +486,8 @@ func (b *errorRatioCircuitBreaker) TryPass(ctx *base.EntryContext) bool {
if b.retryTimeoutArrived() && b.fromOpenToHalfOpen(ctx) {
return true
}
} else if curStatus == HalfOpen && b.probeNumber > 0 {
return true
}
return false
}
Expand Down Expand Up @@ -498,8 +521,11 @@ func (b *errorRatioCircuitBreaker) OnRequestComplete(_ uint64, err error) {
}
if curStatus == HalfOpen {
if err == nil {
b.fromHalfOpenToClosed()
b.resetMetric()
b.addCurProbeNum()
if b.probeNumber == 0 || atomic.LoadUint64(&b.curProbeNumber) >= b.probeNumber {
b.fromHalfOpenToClosed()
b.resetMetric()
}
} else {
b.fromHalfOpenToOpen(1.0)
}
Expand Down Expand Up @@ -612,6 +638,7 @@ func newErrorCountCircuitBreakerWithStat(r *Rule, stat *errorCounterLeapArray) *
retryTimeoutMs: r.RetryTimeoutMs,
nextRetryTimestampMs: 0,
state: newState(),
probeNumber: r.ProbeNum,
},
minRequestAmount: r.MinRequestAmount,
errorCountThreshold: uint64(r.Threshold),
Expand Down Expand Up @@ -644,6 +671,8 @@ func (b *errorCountCircuitBreaker) TryPass(ctx *base.EntryContext) bool {
if b.retryTimeoutArrived() && b.fromOpenToHalfOpen(ctx) {
return true
}
} else if curStatus == HalfOpen && b.probeNumber > 0 {
return true
}
return false
}
Expand Down Expand Up @@ -675,8 +704,11 @@ func (b *errorCountCircuitBreaker) OnRequestComplete(_ uint64, err error) {
}
if curStatus == HalfOpen {
if err == nil {
b.fromHalfOpenToClosed()
b.resetMetric()
b.addCurProbeNum()
if b.probeNumber == 0 || atomic.LoadUint64(&b.curProbeNumber) >= b.probeNumber {
b.fromHalfOpenToClosed()
b.resetMetric()
}
} else {
b.fromHalfOpenToOpen(1)
}
Expand Down
129 changes: 126 additions & 3 deletions core/circuitbreaker/circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,15 @@ type StateChangeListenerMock struct {
}

func (s *StateChangeListenerMock) OnTransformToClosed(prev State, rule Rule) {
_ = s.Called(prev, rule)
logging.Debug("transform to closed", "strategy", rule.Strategy, "prevState", prev.String())
return
}

func (s *StateChangeListenerMock) OnTransformToOpen(prev State, rule Rule, snapshot interface{}) {
_ = s.Called(prev, rule, snapshot)
logging.Debug("transform to open", "strategy", rule.Strategy, "prevState", prev.String(), "snapshot", snapshot)
}

func (s *StateChangeListenerMock) OnTransformToHalfOpen(prev State, rule Rule) {
_ = s.Called(prev, rule)
logging.Debug("transform to Half-Open", "strategy", rule.Strategy, "prevState", prev.String())
}

Expand Down Expand Up @@ -140,6 +137,35 @@ func TestSlowRtCircuitBreaker_TryPass(t *testing.T) {
assert.True(t, pass)
assert.True(t, b.state.get() == HalfOpen)
})

t.Run("TryPass_ProbeNum", func(t *testing.T) {
r := &Rule{
Resource: "abc",
Strategy: SlowRequestRatio,
RetryTimeoutMs: 3000,
MinRequestAmount: 10,
StatIntervalMs: 10000,
MaxAllowedRtMs: 50,
Threshold: 0.5,
ProbeNum: 10,
}
b, err := newSlowRtCircuitBreaker(r)
assert.Nil(t, err)

b.state.set(Open)
ctx := &base.EntryContext{
Resource: base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound),
}
e := base.NewSentinelEntry(ctx, base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound), nil)
ctx.SetEntry(e)
for i := 0; i < 10; i++ {
pass := b.TryPass(ctx)
assert.True(t, pass)
assert.True(t, b.state.get() == HalfOpen)
b.OnRequestComplete(1, nil)
}
assert.True(t, b.state.get() == Closed)
})
}

func TestSlowRt_OnRequestComplete(t *testing.T) {
Expand Down Expand Up @@ -169,6 +195,20 @@ func TestSlowRt_OnRequestComplete(t *testing.T) {
b.OnRequestComplete(10, nil)
assert.True(t, b.CurrentState() == Closed)
})
t.Run("OnRequestComplete_ProbeNum_Success", func(t *testing.T) {
b.probeNumber = 2
b.state.set(HalfOpen)
b.OnRequestComplete(10, nil)
assert.True(t, b.CurrentState() == HalfOpen)
assert.True(t, b.curProbeNumber == 1)
})
t.Run("OnRequestComplete_ProbeNum_Failed", func(t *testing.T) {
b.probeNumber = 2
b.state.set(HalfOpen)
b.OnRequestComplete(base.NewEmptyEntryContext().Rt(), nil)
assert.True(t, b.CurrentState() == Open)
assert.True(t, b.curProbeNumber == 0)
})
}

func TestSlowRt_ResetBucketTo(t *testing.T) {
Expand Down Expand Up @@ -227,6 +267,33 @@ func TestErrorRatioCircuitBreaker_TryPass(t *testing.T) {
assert.True(t, pass)
assert.True(t, b.state.get() == HalfOpen)
})
t.Run("TryPass_ProbeNum", func(t *testing.T) {
r := &Rule{
Resource: "abc",
Strategy: ErrorRatio,
RetryTimeoutMs: 3000,
MinRequestAmount: 10,
StatIntervalMs: 10000,
Threshold: 0.5,
ProbeNum: 10,
}
b, err := newErrorRatioCircuitBreaker(r)
assert.Nil(t, err)

b.state.set(Open)
ctx := &base.EntryContext{
Resource: base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound),
}
e := base.NewSentinelEntry(ctx, base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound), nil)
ctx.SetEntry(e)
for i := 0; i < 10; i++ {
pass := b.TryPass(ctx)
assert.True(t, pass)
assert.True(t, b.state.get() == HalfOpen)
b.OnRequestComplete(1, nil)
}
assert.True(t, b.state.get() == Closed)
})
}

func TestErrorRatio_OnRequestComplete(t *testing.T) {
Expand Down Expand Up @@ -254,6 +321,20 @@ func TestErrorRatio_OnRequestComplete(t *testing.T) {
b.OnRequestComplete(0, errors.New("errorRatio"))
assert.True(t, b.CurrentState() == Open)
})
t.Run("OnRequestComplete_ProbeNum_Success", func(t *testing.T) {
b.probeNumber = 2
b.state.set(HalfOpen)
b.OnRequestComplete(base.NewEmptyEntryContext().Rt(), nil)
assert.True(t, b.CurrentState() == HalfOpen)
assert.True(t, b.curProbeNumber == 1)
})
t.Run("OnRequestComplete_ProbeNum_Failed", func(t *testing.T) {
b.probeNumber = 2
b.state.set(HalfOpen)
b.OnRequestComplete(0, errors.New("errorRatio"))
assert.True(t, b.CurrentState() == Open)
assert.True(t, b.curProbeNumber == 0)
})
}

func TestErrorRatio_ResetBucketTo(t *testing.T) {
Expand Down Expand Up @@ -312,6 +393,34 @@ func TestErrorCountCircuitBreaker_TryPass(t *testing.T) {
assert.True(t, pass)
assert.True(t, b.state.get() == HalfOpen)
})

t.Run("TryPass_ProbeNum", func(t *testing.T) {
r := &Rule{
Resource: "abc",
Strategy: ErrorCount,
RetryTimeoutMs: 3000,
MinRequestAmount: 10,
StatIntervalMs: 10000,
Threshold: 1.0,
ProbeNum: 10,
}
b, err := newErrorCountCircuitBreaker(r)
assert.Nil(t, err)

b.state.set(Open)
ctx := &base.EntryContext{
Resource: base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound),
}
e := base.NewSentinelEntry(ctx, base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound), nil)
ctx.SetEntry(e)
for i := 0; i < 10; i++ {
pass := b.TryPass(ctx)
assert.True(t, pass)
assert.True(t, b.state.get() == HalfOpen)
b.OnRequestComplete(1, nil)
}
assert.True(t, b.state.get() == Closed)
})
}

func TestErrorCount_OnRequestComplete(t *testing.T) {
Expand Down Expand Up @@ -339,6 +448,20 @@ func TestErrorCount_OnRequestComplete(t *testing.T) {
b.OnRequestComplete(0, errors.New("errorCount"))
assert.True(t, b.CurrentState() == Open)
})
t.Run("OnRequestComplete_ProbeNum_Success", func(t *testing.T) {
b.probeNumber = 2
b.state.set(HalfOpen)
b.OnRequestComplete(base.NewEmptyEntryContext().Rt(), nil)
assert.True(t, b.CurrentState() == HalfOpen)
assert.True(t, b.curProbeNumber == 1)
})
t.Run("OnRequestComplete_ProbeNum_Failed", func(t *testing.T) {
b.probeNumber = 2
b.state.set(HalfOpen)
b.OnRequestComplete(0, errors.New("errorCount"))
assert.True(t, b.CurrentState() == Open)
assert.True(t, b.curProbeNumber == 0)
})
}

func TestFromClosedToOpen(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions core/circuitbreaker/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ type Rule struct {
// for ErrorRatio, it represents the max error request ratio
// for ErrorCount, it represents the max error request count
Threshold float64 `json:"threshold"`
//ProbeNum is number of probes required when the circuit breaker is half-open.
//when the probe num are set and circuit breaker in the half-open state.
//if err occurs during the probe, the circuit breaker is opened immediately.
//otherwise,the circuit breaker is closed only after the number of probes is reached
ProbeNum uint64 `json:"probeNum"`
}

func (r *Rule) String() string {
Expand Down