Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove ConcurrentDictionary lookups from Unix socket event loop #109052

Merged
merged 2 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/libraries/System.Net.Sockets/src/Resources/Strings.resx
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,6 @@
<data name="SystemNetSockets_PlatformNotSupported" xml:space="preserve">
<value>System.Net.Sockets is not supported on this platform.</value>
</data>
<data name="net_sockets_handle_already_used" xml:space="preserve">
<value>Handle is already used by another Socket.</value>
</data>
<data name="net_sockets_address_small" xml:space="preserve">
<value>Provided SocketAddress is too small for given AddressFamily.</value>
</data>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1262,6 +1262,8 @@ public void Trace(SocketAsyncContext context, string message, [CallerMemberName]
private SocketAsyncEngine? _asyncEngine;
private bool IsRegistered => _asyncEngine != null;
private bool _isHandleNonBlocking = OperatingSystem.IsWasi(); // WASI sockets are always non-blocking, because we don't have another thread which could be blocked
/// <summary>An index into <see cref="SocketAsyncEngine"/>'s table of all contexts that are currently <see cref="IsRegistered"/>.</summary>
internal int GlobalContextIndex = -1;

private readonly object _registerLock = new object();

Expand Down Expand Up @@ -1330,7 +1332,10 @@ public bool StopAndAbort()
// We don't need to synchronize with Register.
// This method is called when the handle gets released.
// The Register method will throw ODE when it tries to use the handle at this point.
_asyncEngine?.UnregisterSocket(_socket.DangerousGetHandle(), this);
if (IsRegistered)
{
SocketAsyncEngine.UnregisterSocket(this);
}

return aborted;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
Expand Down Expand Up @@ -74,14 +75,17 @@ private static SocketAsyncEngine[] CreateEngines()
return engines;
}

/// <summary>
/// Each <see cref="SocketAsyncContext"/> is assigned an index into this table while registered with a <see cref="SocketAsyncEngine"/>.
/// <para>The index is used as the <see cref="Interop.Sys.SocketEvent.Data"/> to quickly map events to <see cref="SocketAsyncContext"/>s.</para>
/// <para>It is also stored in <see cref="SocketAsyncContext.GlobalContextIndex"/> so that we can efficiently remove it when unregistering the socket.</para>
/// </summary>
private static SocketAsyncContext?[] s_registeredContexts = [];
private static readonly Queue<int> s_registeredContextsFreeList = [];

private readonly IntPtr _port;
private readonly Interop.Sys.SocketEvent* _buffer;

//
// Maps handle values to SocketAsyncContext instances.
//
private readonly ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper> _handleToContextMap = new ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper>();

//
// Queue of events generated by EventLoop() that would be processed by the thread pool
//
Expand Down Expand Up @@ -119,28 +123,54 @@ public static bool TryRegisterSocket(IntPtr socketHandle, SocketAsyncContext con

private bool TryRegisterCore(IntPtr socketHandle, SocketAsyncContext context, out Interop.Error error)
{
bool added = _handleToContextMap.TryAdd(socketHandle, new SocketAsyncContextWrapper(context));
if (!added)
Debug.Assert(context.GlobalContextIndex == -1);

lock (s_registeredContextsFreeList)
{
// Using public SafeSocketHandle(IntPtr) a user can add the same handle
// from a different Socket instance.
throw new InvalidOperationException(SR.net_sockets_handle_already_used);
if (!s_registeredContextsFreeList.TryDequeue(out int index))
{
int previousLength = s_registeredContexts.Length;
int newLength = Math.Max(4, 2 * previousLength);

Array.Resize(ref s_registeredContexts, newLength);

for (int i = previousLength + 1; i < newLength; i++)
{
s_registeredContextsFreeList.Enqueue(i);
}

index = previousLength;
}

Debug.Assert(s_registeredContexts[index] is null);

s_registeredContexts[index] = context;
context.GlobalContextIndex = index;
}

error = Interop.Sys.TryChangeSocketEventRegistration(_port, socketHandle, Interop.Sys.SocketEvents.None,
Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write, socketHandle);
Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write, context.GlobalContextIndex);
if (error == Interop.Error.SUCCESS)
{
return true;
}

_handleToContextMap.TryRemove(socketHandle, out _);
UnregisterSocket(context);
return false;
}

public void UnregisterSocket(IntPtr socketHandle, SocketAsyncContext __)
public static void UnregisterSocket(SocketAsyncContext context)
{
_handleToContextMap.TryRemove(socketHandle, out _);
Debug.Assert(context.GlobalContextIndex >= 0);
Debug.Assert(ReferenceEquals(s_registeredContexts[context.GlobalContextIndex], context));

lock (s_registeredContextsFreeList)
{
s_registeredContexts[context.GlobalContextIndex] = null;
s_registeredContextsFreeList.Enqueue(context.GlobalContextIndex);
}

context.GlobalContextIndex = -1;
}

private SocketAsyncEngine()
Expand Down Expand Up @@ -324,13 +354,11 @@ private readonly struct SocketEventHandler
{
public Interop.Sys.SocketEvent* Buffer { get; }

private readonly ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper> _handleToContextMap;
private readonly ConcurrentQueue<SocketIOEvent> _eventQueue;

public SocketEventHandler(SocketAsyncEngine engine)
{
Buffer = engine._buffer;
_handleToContextMap = engine._handleToContextMap;
_eventQueue = engine._eventQueue;
}

Expand All @@ -340,10 +368,15 @@ public bool HandleSocketEvents(int numEvents)
bool enqueuedEvent = false;
foreach (var socketEvent in new ReadOnlySpan<Interop.Sys.SocketEvent>(Buffer, numEvents))
{
if (_handleToContextMap.TryGetValue(socketEvent.Data, out SocketAsyncContextWrapper contextWrapper))
{
SocketAsyncContext context = contextWrapper.Context;
Debug.Assert((uint)socketEvent.Data < (uint)s_registeredContexts.Length);

// The context may be null if the socket was unregistered right before the event was processed.
// The slot in s_registeredContexts may have been reused by a different context, in which case the
// incorrect socket will notice that no information is available yet and harmlessly retry, waiting for new events.
SocketAsyncContext? context = s_registeredContexts[(uint)socketEvent.Data];

if (context is not null)
{
if (context.PreferInlineCompletions)
{
context.HandleEventsInline(socketEvent.Events);
Expand All @@ -365,18 +398,6 @@ public bool HandleSocketEvents(int numEvents)
}
}

// struct wrapper is used in order to improve the performance of the epoll thread hot path by up to 3% of some TechEmpower benchmarks
// the goal is to have a dedicated generic instantiation and using:
// System.Collections.Concurrent.ConcurrentDictionary`2[System.IntPtr,System.Net.Sockets.SocketAsyncContextWrapper]::TryGetValueInternal(!0,int32,!1&)
// instead of:
// System.Collections.Concurrent.ConcurrentDictionary`2[System.IntPtr,System.__Canon]::TryGetValueInternal(!0,int32,!1&)
private readonly struct SocketAsyncContextWrapper
{
public SocketAsyncContextWrapper(SocketAsyncContext context) => Context = context;

internal SocketAsyncContext Context { get; }
}

private readonly struct SocketIOEvent
{
public SocketAsyncContext Context { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ public static bool TryRegisterSocket(IntPtr socketHandle, SocketAsyncContext con
return true;
}

#pragma warning disable CA1822
public void UnregisterSocket(IntPtr _, SocketAsyncContext context)
#pragma warning restore CA1822
public static void UnregisterSocket(SocketAsyncContext context)
{
context.unregisterPollHook.Cancel();
}
Expand Down
Loading