Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update RateLimiter queues on cancellation #64825

Merged
merged 3 commits into from
Feb 10, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, Canc
}
}

TaskCompletionSource<RateLimitLease> tcs = new TaskCompletionSource<RateLimitLease>(TaskCreationOptions.RunContinuationsAsynchronously);
WrappedTCS tcs = new WrappedTCS(permitCount, this);
CancellationTokenRegistration ctr = default;
if (cancellationToken.CanBeCanceled)
{
ctr = cancellationToken.Register(static obj =>
{
((TaskCompletionSource<RateLimitLease>)obj!).TrySetException(new OperationCanceledException());
((WrappedTCS)obj!).TryCancel();
}, tcs);
}

Expand Down Expand Up @@ -194,7 +194,6 @@ private void Release(int releaseCount)

_permitCount -= nextPendingRequest.Count;
_queueCount -= nextPendingRequest.Count;
Debug.Assert(_queueCount >= 0);
Debug.Assert(_permitCount >= 0);

ConcurrencyLease lease = nextPendingRequest.Count == 0 ? SuccessfulLease : new ConcurrencyLease(true, this, nextPendingRequest.Count);
Expand All @@ -203,8 +202,11 @@ private void Release(int releaseCount)
{
// Queued item was canceled so add count back
_permitCount += nextPendingRequest.Count;
// Updating queue count is handled by the cancellation code
_queueCount += nextPendingRequest.Count;
}
nextPendingRequest.CancellationTokenRegistration.Dispose();
Debug.Assert(_queueCount >= 0);
}
else
{
Expand Down Expand Up @@ -319,5 +321,28 @@ public RequestRegistration(int requestedCount, TaskCompletionSource<RateLimitLea

public CancellationTokenRegistration CancellationTokenRegistration { get; }
}

private class WrappedTCS : TaskCompletionSource<RateLimitLease>
{
private readonly int PermitCount;
private readonly ConcurrencyLimiter Limiter;

public WrappedTCS(int permitCount, ConcurrencyLimiter limiter) : base(TaskCreationOptions.RunContinuationsAsynchronously)
{
PermitCount = permitCount;
Limiter = limiter;
}

public void TryCancel()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: there's a minor (internal) usability issue here, as now there's a functional difference between TryCancel and TrySetCanceled, which sound very similar.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the recommended approach be to new override TrySetCanceled?

{
if (TrySetException(new OperationCanceledException()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this rather than TrySetCanceled()?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's some previous discussion: aspnet/AspLabs#387 (comment)

Given that we're doing return new ValueTask<RateLimitLease>(Task.FromCanceled<RateLimitLease>(cancellationToken)); instead of calling ThrowIfCancellationRequested(), that's even more reason to use TrySetCanceled() now here.

We should probably capture the cancellationToken too and call the TrySetCanceled(CancellationToken) overload.

{
lock (Limiter.Lock)
{
Limiter._queueCount -= PermitCount;
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,13 @@ protected override ValueTask<RateLimitLease> WaitAsyncCore(int tokenCount, Cance
}
}

TaskCompletionSource<RateLimitLease> tcs = new TaskCompletionSource<RateLimitLease>(TaskCreationOptions.RunContinuationsAsynchronously);

WrappedTCS tcs = new WrappedTCS(tokenCount, this);
CancellationTokenRegistration ctr = default;
if (cancellationToken.CanBeCanceled)
{
ctr = cancellationToken.Register(static obj =>
{
((TaskCompletionSource<RateLimitLease>)obj!).TrySetException(new OperationCanceledException());
((WrappedTCS)obj!).TryCancel();
}, tcs);
}

Expand All @@ -140,7 +139,6 @@ protected override ValueTask<RateLimitLease> WaitAsyncCore(int tokenCount, Cance
_queueCount += tokenCount;
Debug.Assert(_queueCount <= _options.QueueLimit);

// handle cancellation
return new ValueTask<RateLimitLease>(registration.Tcs.Task);
}
}
Expand Down Expand Up @@ -276,15 +274,17 @@ private void ReplenishInternal(uint nowTicks)

_queueCount -= nextPendingRequest.Count;
_tokenCount -= nextPendingRequest.Count;
Debug.Assert(_queueCount >= 0);
Debug.Assert(_tokenCount >= 0);

if (!nextPendingRequest.Tcs.TrySetResult(SuccessfulLease))
{
// Queued item was canceled so add count back
_tokenCount += nextPendingRequest.Count;
// Updating queue count is handled by the cancellation code
_queueCount += nextPendingRequest.Count;
}
nextPendingRequest.CancellationTokenRegistration.Dispose();
Debug.Assert(_queueCount >= 0);
}
else
{
Expand Down Expand Up @@ -380,7 +380,29 @@ public RequestRegistration(int tokenCount, TaskCompletionSource<RateLimitLease>
public TaskCompletionSource<RateLimitLease> Tcs { get; }

public CancellationTokenRegistration CancellationTokenRegistration { get; }
}

private class WrappedTCS : TaskCompletionSource<RateLimitLease>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this type be moved to the base class? So it's doubled from ConcurrencyLimiter (almost).

Copy link
Member Author

@BrennanConroy BrennanConroy Feb 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot of similar code between TokenBucket and ConcurrencyLimiter, planning on sharing a lot of it in a future change.

{
private readonly int TokenCount;
private readonly TokenBucketRateLimiter Limiter;

public WrappedTCS(int tokenCount, TokenBucketRateLimiter limiter) : base(TaskCreationOptions.RunContinuationsAsynchronously)
{
TokenCount = tokenCount;
Limiter = limiter;
}

public void TryCancel()
{
if (TrySetException(new OperationCanceledException()))
{
lock (Limiter.Lock)
{
Limiter._queueCount -= TokenCount;
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ public abstract class BaseRateLimiterTests
[Fact]
public abstract Task CanCancelWaitAsyncBeforeQueuing();

[Fact]
public abstract Task CancelUpdatesQueueLimit();

[Fact]
public abstract Task CanAcquireResourcesWithAcquireWithQueuedItemsIfNewestFirst();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,27 @@ public override async Task CanCancelWaitAsyncBeforeQueuing()
Assert.Equal(1, limiter.GetAvailablePermits());
}

[Fact]
public override async Task CancelUpdatesQueueLimit()
{
var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1));
var lease = limiter.Acquire(1);
Assert.True(lease.IsAcquired);

var cts = new CancellationTokenSource();
var wait = limiter.WaitAsync(1, cts.Token);

cts.Cancel();
await Assert.ThrowsAsync<OperationCanceledException>(() => wait.AsTask());

wait = limiter.WaitAsync(1);
Assert.False(wait.IsCompleted);

lease.Dispose();
lease = await wait;
Assert.True(lease.IsAcquired);
}

[Fact]
public override void NoMetadataOnAcquiredLease()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,28 @@ public override async Task CanCancelWaitAsyncBeforeQueuing()
Assert.Equal(1, limiter.GetAvailablePermits());
}

[Fact]
public override async Task CancelUpdatesQueueLimit()
{
var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1,
TimeSpan.Zero, 1, autoReplenishment: false));
var lease = limiter.Acquire(1);
Assert.True(lease.IsAcquired);

var cts = new CancellationTokenSource();
var wait = limiter.WaitAsync(1, cts.Token);

cts.Cancel();
await Assert.ThrowsAsync<OperationCanceledException>(() => wait.AsTask());

wait = limiter.WaitAsync(1);
Assert.False(wait.IsCompleted);

limiter.TryReplenish();
lease = await wait;
Assert.True(lease.IsAcquired);
}

[Fact]
public override void NoMetadataOnAcquiredLease()
{
Expand Down