From 402a7d0b11656b6b729149292a6d91dca6a45de9 Mon Sep 17 00:00:00 2001 From: Miha Zupan Date: Sat, 19 Oct 2024 21:25:16 +0200 Subject: [PATCH 1/2] Remove ConcurrentDictionary lookups from Unix socket event loop --- .../src/Resources/Strings.resx | 3 - .../Net/Sockets/SocketAsyncContext.Unix.cs | 7 +- .../Net/Sockets/SocketAsyncEngine.Unix.cs | 67 ++++++++++--------- .../Net/Sockets/SocketAsyncEngine.Wasi.cs | 4 +- 4 files changed, 43 insertions(+), 38 deletions(-) diff --git a/src/libraries/System.Net.Sockets/src/Resources/Strings.resx b/src/libraries/System.Net.Sockets/src/Resources/Strings.resx index 7a0feca077c032..36faf4552f604c 100644 --- a/src/libraries/System.Net.Sockets/src/Resources/Strings.resx +++ b/src/libraries/System.Net.Sockets/src/Resources/Strings.resx @@ -312,9 +312,6 @@ System.Net.Sockets is not supported on this platform. - - Handle is already used by another Socket. - Provided SocketAddress is too small for given AddressFamily. diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs index 8463c5142b573c..bd8baa10aea392 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @@ -1262,6 +1262,8 @@ public void Trace(SocketAsyncContext context, string message, [CallerMemberName] private SocketAsyncEngine? _asyncEngine; private bool IsRegistered => _asyncEngine != null; private bool _isHandleNonBlocking = OperatingSystem.IsWasi(); // WASI sockets are always non-blocking, because we don't have another thread which could be blocked + /// Handle associated with a while . + internal GCHandle GCHandle; private readonly object _registerLock = new object(); @@ -1330,7 +1332,10 @@ public bool StopAndAbort() // We don't need to synchronize with Register. // This method is called when the handle gets released. // The Register method will throw ODE when it tries to use the handle at this point. - _asyncEngine?.UnregisterSocket(_socket.DangerousGetHandle(), this); + if (IsRegistered) + { + SocketAsyncEngine.UnregisterSocket(this); + } return aborted; } diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 7405e579042232..accb104d8a9e9f 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -74,14 +74,20 @@ private static SocketAsyncEngine[] CreateEngines() return engines; } + /// + /// Each is assigned a while registered with a . + /// The handle is used as the to quickly map events to s. + /// When a socket is disposed and unregistered, there is a small race condition where we may still receive events with the + /// now-unused . Since we assume that the handle is always allocated in , + /// we can never these handles. + /// Instead, we reuse handles for new sockets. The race condition may therefore trigger a notification for the wrong socket, + /// but that is okay as those operations can be retried. + /// + private static readonly ConcurrentQueue s_gcHandles = new(); + private readonly IntPtr _port; private readonly Interop.Sys.SocketEvent* _buffer; - // - // Maps handle values to SocketAsyncContext instances. - // - private readonly ConcurrentDictionary _handleToContextMap = new ConcurrentDictionary(); - // // Queue of events generated by EventLoop() that would be processed by the thread pool // @@ -119,28 +125,39 @@ public static bool TryRegisterSocket(IntPtr socketHandle, SocketAsyncContext con private bool TryRegisterCore(IntPtr socketHandle, SocketAsyncContext context, out Interop.Error error) { - bool added = _handleToContextMap.TryAdd(socketHandle, new SocketAsyncContextWrapper(context)); - if (!added) + Debug.Assert(!context.GCHandle.IsAllocated); + + if (s_gcHandles.TryDequeue(out context.GCHandle)) + { + context.GCHandle.Target = context; + } + else { - // Using public SafeSocketHandle(IntPtr) a user can add the same handle - // from a different Socket instance. - throw new InvalidOperationException(SR.net_sockets_handle_already_used); + context.GCHandle = GCHandle.Alloc(context, GCHandleType.Normal); } error = Interop.Sys.TryChangeSocketEventRegistration(_port, socketHandle, Interop.Sys.SocketEvents.None, - Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write, socketHandle); + Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write, GCHandle.ToIntPtr(context.GCHandle)); if (error == Interop.Error.SUCCESS) { return true; } - _handleToContextMap.TryRemove(socketHandle, out _); + UnregisterSocket(context); return false; } - public void UnregisterSocket(IntPtr socketHandle, SocketAsyncContext __) + public static void UnregisterSocket(SocketAsyncContext context) { - _handleToContextMap.TryRemove(socketHandle, out _); + Debug.Assert(context.GCHandle.IsAllocated); + Debug.Assert(ReferenceEquals(context.GCHandle.Target, context)); + + GCHandle handle = context.GCHandle; + context.GCHandle = default; + + // See comments on s_gcHandles for why we're reusing the handle instead of freeing it. + handle.Target = null; + s_gcHandles.Enqueue(handle); } private SocketAsyncEngine() @@ -324,13 +341,11 @@ private readonly struct SocketEventHandler { public Interop.Sys.SocketEvent* Buffer { get; } - private readonly ConcurrentDictionary _handleToContextMap; private readonly ConcurrentQueue _eventQueue; public SocketEventHandler(SocketAsyncEngine engine) { Buffer = engine._buffer; - _handleToContextMap = engine._handleToContextMap; _eventQueue = engine._eventQueue; } @@ -340,10 +355,12 @@ public bool HandleSocketEvents(int numEvents) bool enqueuedEvent = false; foreach (var socketEvent in new ReadOnlySpan(Buffer, numEvents)) { - if (_handleToContextMap.TryGetValue(socketEvent.Data, out SocketAsyncContextWrapper contextWrapper)) - { - SocketAsyncContext context = contextWrapper.Context; + GCHandle handle = GCHandle.FromIntPtr(socketEvent.Data); + Debug.Assert(handle.IsAllocated); + Debug.Assert(handle.Target is null or SocketAsyncContext); + if (handle.IsAllocated && Unsafe.As(handle.Target) is { } context) + { if (context.PreferInlineCompletions) { context.HandleEventsInline(socketEvent.Events); @@ -365,18 +382,6 @@ public bool HandleSocketEvents(int numEvents) } } - // struct wrapper is used in order to improve the performance of the epoll thread hot path by up to 3% of some TechEmpower benchmarks - // the goal is to have a dedicated generic instantiation and using: - // System.Collections.Concurrent.ConcurrentDictionary`2[System.IntPtr,System.Net.Sockets.SocketAsyncContextWrapper]::TryGetValueInternal(!0,int32,!1&) - // instead of: - // System.Collections.Concurrent.ConcurrentDictionary`2[System.IntPtr,System.__Canon]::TryGetValueInternal(!0,int32,!1&) - private readonly struct SocketAsyncContextWrapper - { - public SocketAsyncContextWrapper(SocketAsyncContext context) => Context = context; - - internal SocketAsyncContext Context { get; } - } - private readonly struct SocketIOEvent { public SocketAsyncContext Context { get; } diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Wasi.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Wasi.cs index 3b69902260d812..0a39feb2699364 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Wasi.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Wasi.cs @@ -35,9 +35,7 @@ public static bool TryRegisterSocket(IntPtr socketHandle, SocketAsyncContext con return true; } -#pragma warning disable CA1822 - public void UnregisterSocket(IntPtr _, SocketAsyncContext context) -#pragma warning restore CA1822 + public static void UnregisterSocket(SocketAsyncContext context) { context.unregisterPollHook.Cancel(); } From b193fe3cf98f4bf82eb803f3e181e3bf866b6f83 Mon Sep 17 00:00:00 2001 From: Miha Zupan Date: Fri, 3 Jan 2025 12:27:44 +0100 Subject: [PATCH 2/2] Switch from GCHandle to simple array+index --- .../Net/Sockets/SocketAsyncContext.Unix.cs | 4 +- .../Net/Sockets/SocketAsyncEngine.Unix.cs | 70 ++++++++++++------- 2 files changed, 45 insertions(+), 29 deletions(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs index bd8baa10aea392..4cc7a28fca8143 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @@ -1262,8 +1262,8 @@ public void Trace(SocketAsyncContext context, string message, [CallerMemberName] private SocketAsyncEngine? _asyncEngine; private bool IsRegistered => _asyncEngine != null; private bool _isHandleNonBlocking = OperatingSystem.IsWasi(); // WASI sockets are always non-blocking, because we don't have another thread which could be blocked - /// Handle associated with a while . - internal GCHandle GCHandle; + /// An index into 's table of all contexts that are currently . + internal int GlobalContextIndex = -1; private readonly object _registerLock = new object(); diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index accb104d8a9e9f..b67af69163bc5e 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Collections.Concurrent; +using System.Collections.Generic; using System.Diagnostics; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; @@ -75,15 +76,12 @@ private static SocketAsyncEngine[] CreateEngines() } /// - /// Each is assigned a while registered with a . - /// The handle is used as the to quickly map events to s. - /// When a socket is disposed and unregistered, there is a small race condition where we may still receive events with the - /// now-unused . Since we assume that the handle is always allocated in , - /// we can never these handles. - /// Instead, we reuse handles for new sockets. The race condition may therefore trigger a notification for the wrong socket, - /// but that is okay as those operations can be retried. + /// Each is assigned an index into this table while registered with a . + /// The index is used as the to quickly map events to s. + /// It is also stored in so that we can efficiently remove it when unregistering the socket. /// - private static readonly ConcurrentQueue s_gcHandles = new(); + private static SocketAsyncContext?[] s_registeredContexts = []; + private static readonly Queue s_registeredContextsFreeList = []; private readonly IntPtr _port; private readonly Interop.Sys.SocketEvent* _buffer; @@ -125,19 +123,33 @@ public static bool TryRegisterSocket(IntPtr socketHandle, SocketAsyncContext con private bool TryRegisterCore(IntPtr socketHandle, SocketAsyncContext context, out Interop.Error error) { - Debug.Assert(!context.GCHandle.IsAllocated); + Debug.Assert(context.GlobalContextIndex == -1); - if (s_gcHandles.TryDequeue(out context.GCHandle)) + lock (s_registeredContextsFreeList) { - context.GCHandle.Target = context; - } - else - { - context.GCHandle = GCHandle.Alloc(context, GCHandleType.Normal); + if (!s_registeredContextsFreeList.TryDequeue(out int index)) + { + int previousLength = s_registeredContexts.Length; + int newLength = Math.Max(4, 2 * previousLength); + + Array.Resize(ref s_registeredContexts, newLength); + + for (int i = previousLength + 1; i < newLength; i++) + { + s_registeredContextsFreeList.Enqueue(i); + } + + index = previousLength; + } + + Debug.Assert(s_registeredContexts[index] is null); + + s_registeredContexts[index] = context; + context.GlobalContextIndex = index; } error = Interop.Sys.TryChangeSocketEventRegistration(_port, socketHandle, Interop.Sys.SocketEvents.None, - Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write, GCHandle.ToIntPtr(context.GCHandle)); + Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write, context.GlobalContextIndex); if (error == Interop.Error.SUCCESS) { return true; @@ -149,15 +161,16 @@ private bool TryRegisterCore(IntPtr socketHandle, SocketAsyncContext context, ou public static void UnregisterSocket(SocketAsyncContext context) { - Debug.Assert(context.GCHandle.IsAllocated); - Debug.Assert(ReferenceEquals(context.GCHandle.Target, context)); + Debug.Assert(context.GlobalContextIndex >= 0); + Debug.Assert(ReferenceEquals(s_registeredContexts[context.GlobalContextIndex], context)); - GCHandle handle = context.GCHandle; - context.GCHandle = default; + lock (s_registeredContextsFreeList) + { + s_registeredContexts[context.GlobalContextIndex] = null; + s_registeredContextsFreeList.Enqueue(context.GlobalContextIndex); + } - // See comments on s_gcHandles for why we're reusing the handle instead of freeing it. - handle.Target = null; - s_gcHandles.Enqueue(handle); + context.GlobalContextIndex = -1; } private SocketAsyncEngine() @@ -355,11 +368,14 @@ public bool HandleSocketEvents(int numEvents) bool enqueuedEvent = false; foreach (var socketEvent in new ReadOnlySpan(Buffer, numEvents)) { - GCHandle handle = GCHandle.FromIntPtr(socketEvent.Data); - Debug.Assert(handle.IsAllocated); - Debug.Assert(handle.Target is null or SocketAsyncContext); + Debug.Assert((uint)socketEvent.Data < (uint)s_registeredContexts.Length); + + // The context may be null if the socket was unregistered right before the event was processed. + // The slot in s_registeredContexts may have been reused by a different context, in which case the + // incorrect socket will notice that no information is available yet and harmlessly retry, waiting for new events. + SocketAsyncContext? context = s_registeredContexts[(uint)socketEvent.Data]; - if (handle.IsAllocated && Unsafe.As(handle.Target) is { } context) + if (context is not null) { if (context.PreferInlineCompletions) {