Skip to content

Commit

Permalink
Eliminate cancellation deadlock in RateLimiter implementations (dotne…
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennanConroy authored Aug 10, 2023
1 parent b682a70 commit 219392e
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ protected override ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, C
return new ValueTask<RateLimitLease>(SuccessfulLease);
}

using var disposer = default(RequestRegistration.Disposer);

// Perf: Check SemaphoreSlim implementation instead of locking
lock (Lock)
{
Expand All @@ -152,7 +154,7 @@ protected override ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, C
RequestRegistration oldestRequest = _queue.DequeueHead();
_queueCount -= oldestRequest.Count;
Debug.Assert(_queueCount >= 0);
if (!oldestRequest.Tcs.TrySetResult(FailedLease))
if (!oldestRequest.TrySetResult(FailedLease))
{
// Updating queue count is handled by the cancellation code
_queueCount += oldestRequest.Count;
Expand All @@ -161,7 +163,7 @@ protected override ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, C
{
Interlocked.Increment(ref _failedLeasesCount);
}
oldestRequest.CancellationTokenRegistration.Dispose();
disposer.Add(oldestRequest);
}
while (_options.QueueLimit - _queueCount < permitCount);
}
Expand All @@ -173,22 +175,12 @@ protected override ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, C
}
}

CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken);
CancellationTokenRegistration ctr = default;
if (cancellationToken.CanBeCanceled)
{
ctr = cancellationToken.Register(static obj =>
{
((CancelQueueState)obj!).TrySetCanceled();
}, tcs);
}

RequestRegistration request = new RequestRegistration(permitCount, tcs, ctr);
var request = new RequestRegistration(permitCount, this, cancellationToken);
_queue.EnqueueTail(request);
_queueCount += permitCount;
Debug.Assert(_queueCount <= _options.QueueLimit);

return new ValueTask<RateLimitLease>(request.Tcs.Task);
return new ValueTask<RateLimitLease>(request.Task);
}
}

Expand Down Expand Up @@ -224,8 +216,15 @@ private bool TryLeaseUnsynchronized(int permitCount, [NotNullWhen(true)] out Rat
return false;
}

#if DEBUG
// for unit testing
internal event Action? ReleasePreHook;
internal event Action? ReleasePostHook;
#endif

private void Release(int releaseCount)
{
using var disposer = default(RequestRegistration.Disposer);
lock (Lock)
{
if (_disposed)
Expand All @@ -236,6 +235,10 @@ private void Release(int releaseCount)
_permitCount += releaseCount;
Debug.Assert(_permitCount <= _options.PermitLimit);

#if DEBUG
ReleasePreHook?.Invoke();
#endif

while (_queue.Count > 0)
{
RequestRegistration nextPendingRequest =
Expand All @@ -245,15 +248,21 @@ private void Release(int releaseCount)

// Request was handled already, either via cancellation or being kicked from the queue due to a newer request being queued.
// We just need to remove the item and let the next queued item be considered for completion.
if (nextPendingRequest.Tcs.Task.IsCompleted)
if (nextPendingRequest.Task.IsCompleted)
{
nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.DequeueHead()
: _queue.DequeueTail();
nextPendingRequest.CancellationTokenRegistration.Dispose();
disposer.Add(nextPendingRequest);
continue;
}
else if (_permitCount >= nextPendingRequest.Count)

#if DEBUG
ReleasePostHook?.Invoke();
#endif

if (_permitCount >= nextPendingRequest.Count)
{
nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
Expand All @@ -266,7 +275,7 @@ private void Release(int releaseCount)

ConcurrencyLease lease = nextPendingRequest.Count == 0 ? SuccessfulLease : new ConcurrencyLease(true, this, nextPendingRequest.Count);
// Check if request was canceled
if (!nextPendingRequest.Tcs.TrySetResult(lease))
if (!nextPendingRequest.TrySetResult(lease))
{
// Queued item was canceled so add count back
_permitCount += nextPendingRequest.Count;
Expand All @@ -277,7 +286,7 @@ private void Release(int releaseCount)
{
Interlocked.Increment(ref _successfulLeasesCount);
}
nextPendingRequest.CancellationTokenRegistration.Dispose();
disposer.Add(nextPendingRequest);
Debug.Assert(_queueCount >= 0);
}
else
Expand All @@ -289,7 +298,6 @@ private void Release(int releaseCount)
if (_permitCount == _options.PermitLimit)
{
Debug.Assert(_idleSince is null);
Debug.Assert(_queueCount == 0);
_idleSince = Stopwatch.GetTimestamp();
}
}
Expand All @@ -303,6 +311,7 @@ protected override void Dispose(bool disposing)
return;
}

using var disposer = default(RequestRegistration.Disposer);
lock (Lock)
{
if (_disposed)
Expand All @@ -315,8 +324,8 @@ protected override void Dispose(bool disposing)
RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.DequeueHead()
: _queue.DequeueTail();
next.CancellationTokenRegistration.Dispose();
next.Tcs.TrySetResult(FailedLease);
disposer.Add(next);
next.TrySetResult(FailedLease);
}
}
}
Expand Down Expand Up @@ -385,49 +394,68 @@ protected override void Dispose(bool disposing)
}
}

private readonly struct RequestRegistration
private sealed class RequestRegistration : TaskCompletionSource<RateLimitLease>
{
public RequestRegistration(int requestedCount, TaskCompletionSource<RateLimitLease> tcs,
CancellationTokenRegistration cancellationTokenRegistration)
{
Count = requestedCount;
// Perf: Use AsyncOperation<TResult> instead
Tcs = tcs;
CancellationTokenRegistration = cancellationTokenRegistration;
}
private readonly CancellationToken _cancellationToken;
private CancellationTokenRegistration _cancellationTokenRegistration;

public int Count { get; }
// this field is used only by the disposal mechanics and never shared between threads
private RequestRegistration? _next;

public TaskCompletionSource<RateLimitLease> Tcs { get; }
public RequestRegistration(int permitCount, ConcurrencyLimiter limiter, CancellationToken cancellationToken)
: base(limiter, TaskCreationOptions.RunContinuationsAsynchronously)
{
Count = permitCount;
_cancellationToken = cancellationToken;

public CancellationTokenRegistration CancellationTokenRegistration { get; }
}
// RequestRegistration objects are created while the limiter lock is held
// if cancellationToken fires before or while the lock is held, UnsafeRegister
// is going to invoke the callback synchronously, but this does not create
// a deadlock because lock are reentrant
if (cancellationToken.CanBeCanceled)
#if NETCOREAPP || NETSTANDARD2_1_OR_GREATER
_cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this);
#else
_cancellationTokenRegistration = cancellationToken.Register(Cancel, this);
#endif
}

private sealed class CancelQueueState : TaskCompletionSource<RateLimitLease>
{
private readonly int _permitCount;
private readonly ConcurrencyLimiter _limiter;
private readonly CancellationToken _cancellationToken;
public int Count { get; }

public CancelQueueState(int permitCount, ConcurrencyLimiter limiter, CancellationToken cancellationToken)
: base(TaskCreationOptions.RunContinuationsAsynchronously)
private static void Cancel(object? state)
{
_permitCount = permitCount;
_limiter = limiter;
_cancellationToken = cancellationToken;
if (state is RequestRegistration registration && registration.TrySetCanceled(registration._cancellationToken))
{
var limiter = (ConcurrencyLimiter)registration.Task.AsyncState!;
lock (limiter.Lock)
{
limiter._queueCount -= registration.Count;
}
}
}

public new bool TrySetCanceled()
/// <summary>
/// Collects registrations to dispose outside the limiter lock to avoid deadlock.
/// </summary>
public struct Disposer : IDisposable
{
if (TrySetCanceled(_cancellationToken))
private RequestRegistration? _next;

public void Add(RequestRegistration request)
{
request._next = _next;
_next = request;
}

public void Dispose()
{
lock (_limiter.Lock)
for (var current = _next; current is not null; current = current._next)
{
_limiter._queueCount -= _permitCount;
current._cancellationTokenRegistration.Dispose();
}
return true;

_next = null;
}
return false;
}
}
}
Expand Down
Loading

0 comments on commit 219392e

Please sign in to comment.