Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose the current Limit() on existing ratelimiters #6235

Merged
merged 6 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions common/persistence/wrappers/ratelimited/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ package ratelimited
import (
"context"

"golang.org/x/time/rate"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/quotas"
)
Expand All @@ -43,6 +45,10 @@ func (l limiterAlwaysAllow) Reserve() clock.Reservation {
return &reservationAlwaysAllow{}
}

func (l limiterAlwaysAllow) Limit() rate.Limit {
return rate.Inf
}

type limiterNeverAllow struct{}

func (l limiterNeverAllow) Allow() bool {
Expand All @@ -58,6 +64,10 @@ func (l limiterNeverAllow) Reserve() clock.Reservation {
return &reservationNeverAllow{}
}

func (l limiterNeverAllow) Limit() rate.Limit {
return 0
}

type reservationAlwaysAllow struct{}
type reservationNeverAllow struct{}

Expand Down
6 changes: 6 additions & 0 deletions common/quotas/dynamicratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import (
"context"

"golang.org/x/time/rate"

"github.com/uber/cadence/common/clock"
)

Expand Down Expand Up @@ -63,3 +65,7 @@
d.rl.UpdateMaxDispatch(&rps)
return d.rl.Reserve()
}

func (d *DynamicRateLimiter) Limit() rate.Limit {
return rate.Limit(d.rps())

Check warning on line 70 in common/quotas/dynamicratelimiter.go

View check run for this annotation

Codecov / codecov/patch

common/quotas/dynamicratelimiter.go#L69-L70

Added lines #L69 - L70 were not covered by tests
}
5 changes: 5 additions & 0 deletions common/quotas/global/collection/internal/counted.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"context"

"go.uber.org/atomic"
"golang.org/x/time/rate"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/quotas"
Expand Down Expand Up @@ -80,6 +81,10 @@ func (c CountedLimiter) Reserve() clock.Reservation {
}
}

func (c CountedLimiter) Limit() rate.Limit {
return c.wrapped.Limit()
}

func (c CountedLimiter) Collect() UsageMetrics {
return c.usage.Collect()
}
Expand Down
7 changes: 7 additions & 0 deletions common/quotas/global/collection/internal/counted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/quotas"
Expand Down Expand Up @@ -95,6 +96,12 @@ func TestUsage(t *testing.T) {
r.Used(false)
assert.Equal(t, UsageMetrics{0, 1, 0}, lim.Collect(), "not-allowed reservations count as rejection")
})
// largely for coverage
t.Run("supports Limit", func(t *testing.T) {
rps := rate.Limit(1)
lim := NewCountedLimiter(clock.NewMockRatelimiter(clock.NewMockedTimeSource(), rps, 1))
assert.Equal(t, rps, lim.Limit())
})
}

func TestRegression_ReserveCountsCorrectly(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions common/quotas/global/collection/internal/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ func (b *FallbackLimiter) Reserve() clock.Reservation {
}
}

func (b *FallbackLimiter) Limit() rate.Limit {
if b.useFallback() {
return b.fallback.Limit()
}
return b.primary.Limit()
}

func (b *FallbackLimiter) both() quotas.Limiter {
if b.useFallback() {
return NewShadowedLimiter(b.fallback, b.primary)
Expand Down
5 changes: 5 additions & 0 deletions common/quotas/global/collection/internal/fallback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ func TestLimiterNotRacy(t *testing.T) {
lim.Reserve().Used(rand.Int()%2 == 0)
return nil
})
g.Go(func() error {
lim.Limit()
return nil
})
g.Go(func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Microsecond)
defer cancel()
Expand All @@ -187,6 +191,7 @@ type allowres struct{}
func (allowlimiter) Allow() bool { return true }
func (a allowlimiter) Wait(context.Context) error { return nil }
func (a allowlimiter) Reserve() clock.Reservation { return allowres{} }
func (a allowlimiter) Limit() rate.Limit { return rate.Inf }

func (a allowres) Allow() bool { return true }
func (a allowres) Used(bool) {}
6 changes: 6 additions & 0 deletions common/quotas/global/collection/internal/shadowed.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ package internal
import (
"context"

"golang.org/x/time/rate"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/quotas"
)
Expand Down Expand Up @@ -79,6 +81,10 @@ func (s shadowedLimiter) Reserve() clock.Reservation {
}
}

func (s shadowedLimiter) Limit() rate.Limit {
return s.primary.Limit()
}

func (s shadowedReservation) Allow() bool {
_ = s.shadow.Allow()
return s.primary.Allow()
Expand Down
4 changes: 4 additions & 0 deletions common/quotas/global/collection/internal/shadowed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,8 @@ func TestShadowed(t *testing.T) {
})
})
})
t.Run("limit", func(t *testing.T) {
l := NewShadowedLimiter(&allowlimiter{}, quotas.NewSimpleRateLimiter(t, 0))
assert.Equal(t, rate.Inf, l.Limit(), "should return the primary limit, not shadowed")
})
}
8 changes: 8 additions & 0 deletions common/quotas/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ package quotas
import (
"context"

"golang.org/x/time/rate"

"github.com/uber/cadence/common/clock"
)

Expand Down Expand Up @@ -56,6 +58,12 @@ type Limiter interface {

// Reserve reserves a rate limit token
Reserve() clock.Reservation

// Limit returns the current configured ratelimit.
//
// If this Limiter wraps multiple values, this is generally the "most relevant" one,
// i.e. the one that is most likely to apply to the next request
Limit() rate.Limit
}

// Policy corresponds to a quota policy. A policy allows implementing layered
Expand Down
15 changes: 15 additions & 0 deletions common/quotas/limiter_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions common/quotas/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"golang.org/x/time/rate"

"github.com/uber/cadence/common/clock"
)
Expand All @@ -42,6 +43,17 @@ func TestNewRateLimiter(t *testing.T) {
rl := NewRateLimiter(&maxDispatch, time.Second, _minBurst)
limiter := rl.goRateLimiter.Load().(clock.Ratelimiter)
assert.Equal(t, _minBurst, limiter.Burst())
assert.Equal(t, maxDispatch, float64(limiter.Limit()))
}

func TestSimpleRatelimiter(t *testing.T) {
// largely for coverage, as this is a test-helper that is used in other packages
l := NewSimpleRateLimiter(t, 5)
assert.Equal(t, rate.Limit(5), l.Limit())
assert.True(t, l.Allow(), "should allow one request through")
updated := 3.0 // must be lower than current value or it will not update
l.UpdateMaxDispatch(&updated)
assert.Equal(t, rate.Limit(3), l.Limit(), "should have immediately updated to new lower value")
}

func TestMultiStageRateLimiterBlockedByDomainRps(t *testing.T) {
Expand Down
13 changes: 7 additions & 6 deletions common/quotas/ratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@

import (
"context"
"math"
"sync"
"sync/atomic"
"testing"
"time"

"golang.org/x/time/rate"
Expand Down Expand Up @@ -58,8 +58,9 @@
}

// NewSimpleRateLimiter returns a new rate limiter backed by the golang rate
// limiter
func NewSimpleRateLimiter(rps int) *RateLimiter {
// limiter. This is currently only used in tests.
func NewSimpleRateLimiter(t *testing.T, rps int) *RateLimiter {
t.Helper() // ensure a T has been passed
Comment on lines +61 to +63
Copy link
Member Author

@Groxx Groxx Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems very likely that this will only ever be used in tests. almost all of our real limiters are dynamically configurable.

Copy link
Member

@davidporter-id-au davidporter-id-au Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd consider s/Simple/Test/ because it presumably means this has testing functionality and that makes it considerably easier to find when casting about for functions to use in tests

initialRps := float64(rps)
return NewRateLimiter(&initialRps, _defaultRPSTTL, _burstSize)
}
Expand Down Expand Up @@ -107,13 +108,13 @@
}

// Limit returns the current rate per second limit for this ratelimiter
func (rl *RateLimiter) Limit() float64 {
func (rl *RateLimiter) Limit() rate.Limit {
Comment on lines -110 to +111
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to change because this is also a quotas.Limiter, and the signature is incompatible.

rl.RLock()
defer rl.RUnlock()
if rl.maxDispatchPerSecond != nil {
return *rl.maxDispatchPerSecond
return rate.Limit(*rl.maxDispatchPerSecond)
}
return math.MaxFloat64
return rate.Inf

Check warning on line 117 in common/quotas/ratelimiter.go

View check run for this annotation

Codecov / codecov/patch

common/quotas/ratelimiter.go#L117

Added line #L117 was not covered by tests
}

func (rl *RateLimiter) storeLimiter(maxDispatchPerSecond *float64) {
Expand Down
2 changes: 1 addition & 1 deletion service/matching/tasklist/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func (tm *TaskMatcher) UpdateRatelimit(rps *float64) {

// Rate returns the current rate at which tasks are dispatched
func (tm *TaskMatcher) Rate() float64 {
return tm.limiter.Limit()
return float64(tm.limiter.Limit())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

though I'd like to add this....... it leads to a massive explosion in "well this is also the same value, so change that too..." chain of changes, with no good place to cut things off.

I think changing all these is very much worth it, float64 is rather obviously a worse type. But I'm not tackling that right now.

}

func (tm *TaskMatcher) pollOrForward(
Expand Down
6 changes: 3 additions & 3 deletions service/worker/archiver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ func (s *clientSuite) SetupTest() {
log.NewNoop(),
nil,
dynamicconfig.GetIntPropertyFn(1000),
quotas.NewSimpleRateLimiter(1000),
quotas.NewSimpleRateLimiter(1),
quotas.NewSimpleRateLimiter(1),
quotas.NewSimpleRateLimiter(s.T(), 1000),
quotas.NewSimpleRateLimiter(s.T(), 1),
quotas.NewSimpleRateLimiter(s.T(), 1),
s.archiverProvider,
dynamicconfig.GetBoolPropertyFn(false),
).(*client)
Expand Down
Loading