From 273f708111b3c8decce47363cda6935052641c8f Mon Sep 17 00:00:00 2001 From: Andrew Robinson Date: Wed, 1 Feb 2017 22:55:49 -0500 Subject: [PATCH] Fix transient test #18 --- src/KafkaClient.Tests/Unit/TransportTests.cs | 10 +- src/KafkaClient.Tests/project.json | 4 +- src/KafkaClient.Tests/project.lock.json | 56 +++++------ src/KafkaClient/Common/Extensions.cs | 12 ++- .../Connections/ReconnectingSocket.cs | 92 ++++++++++--------- 5 files changed, 87 insertions(+), 87 deletions(-) diff --git a/src/KafkaClient.Tests/Unit/TransportTests.cs b/src/KafkaClient.Tests/Unit/TransportTests.cs index db54aafe..2bf9964d 100644 --- a/src/KafkaClient.Tests/Unit/TransportTests.cs +++ b/src/KafkaClient.Tests/Unit/TransportTests.cs @@ -255,12 +255,12 @@ public async Task ReadShouldThrowServerDisconnectedExceptionWhenDisconnected() [Test] public async Task WhenNoConnectionThrowSocketExceptionAfterMaxRetry() { - var reconnectionAttempt = 0; - const int maxAttempts = 3; + var connectionAttempts = 0; + const int maxRetries = 2; var endpoint = TestConfig.ServerEndpoint(); var config = new ConnectionConfiguration( - Retry.AtMost(maxAttempts), - onConnecting: (e, attempt, elapsed) => Interlocked.Increment(ref reconnectionAttempt) + Retry.AtMost(maxRetries), + onConnecting: (e, attempt, elapsed) => Interlocked.Increment(ref connectionAttempts) ); using (var transport = new SocketTransport(endpoint, config, TestConfig.Log)) { try { @@ -269,7 +269,7 @@ public async Task WhenNoConnectionThrowSocketExceptionAfterMaxRetry() } catch (ConnectionException) { // expected } - Assert.That(reconnectionAttempt, Is.EqualTo(maxAttempts + 1)); + Assert.That(connectionAttempts, Is.EqualTo(1 + maxRetries)); } } diff --git a/src/KafkaClient.Tests/project.json b/src/KafkaClient.Tests/project.json index ebb243a6..573b5a9f 100644 --- a/src/KafkaClient.Tests/project.json +++ b/src/KafkaClient.Tests/project.json @@ -13,9 +13,9 @@ "Nito.AsyncEx.Coordination": "1.0.2", "NSubstitute": "2.0.0-rc", - "NUnit": "3.6.0", + "NUnit": "3.5.0", "System.Threading.Tasks.Parallel": "4.3.0", - "dotnet-test-nunit": "3.4.0-beta-3" + "dotnet-test-nunit": "3.4.0-beta-2" }, "testRunner": "nunit", diff --git a/src/KafkaClient.Tests/project.lock.json b/src/KafkaClient.Tests/project.lock.json index 7b8ddbf5..2dd1b2f1 100644 --- a/src/KafkaClient.Tests/project.lock.json +++ b/src/KafkaClient.Tests/project.lock.json @@ -37,13 +37,13 @@ "lib/netstandard1.3/Castle.Core.dll": {} } }, - "dotnet-test-nunit/3.4.0-beta-3": { + "dotnet-test-nunit/3.4.0-beta-2": { "type": "package", "dependencies": { "Microsoft.Extensions.DependencyModel": "1.0.0", "Microsoft.Extensions.Testing.Abstractions": "1.0.0-preview2-003121", "Microsoft.NETCore.App": "1.0.0", - "NUnit.Portable.Agent": "3.4.0-beta-3" + "NUnit.Portable.Agent": "3.4.0-beta-2" }, "compile": { "lib/netcoreapp1.0/dotnet-test-nunit.dll": {} @@ -730,20 +730,16 @@ "lib/netstandard1.0/NuGet.Versioning.dll": {} } }, - "NUnit/3.6.0": { + "NUnit/3.5.0": { "type": "package", - "dependencies": { - "NETStandard.Library": "1.6.0", - "System.Runtime.Loader": "4.0.0" - }, "compile": { - "lib/netstandard1.6/nunit.framework.dll": {} + "lib/dotnet/nunit.framework.dll": {} }, "runtime": { - "lib/netstandard1.6/nunit.framework.dll": {} + "lib/dotnet/nunit.framework.dll": {} } }, - "NUnit.Portable.Agent/3.4.0-beta-3": { + "NUnit.Portable.Agent/3.4.0-beta-2": { "type": "package", "dependencies": { "System.Globalization": "4.0.11", @@ -2862,12 +2858,12 @@ "readme.txt" ] }, - "dotnet-test-nunit/3.4.0-beta-3": { - "sha512": "TCprFJd60JQrNSygKAinVFBF2zQK3V2BRm1jeBvAGKF/6omeMrjO1tBD3UKlZSGvHFNyiocPn7bgU68cNz94ug==", + "dotnet-test-nunit/3.4.0-beta-2": { + "sha512": "ImRqxsZMFzn+bM33emu5ZMzIUe8GOHPxdA2NkUIsawWd4YjvcFzdIo5iNuejlY0lw1M+RMwRbpd/cKkhKft52g==", "type": "package", - "path": "dotnet-test-nunit/3.4.0-beta-3", + "path": "dotnet-test-nunit/3.4.0-beta-2", "files": [ - "dotnet-test-nunit.3.4.0-beta-3.nupkg.sha512", + "dotnet-test-nunit.3.4.0-beta-2.nupkg.sha512", "dotnet-test-nunit.nuspec", "lib/net451/dotnet-test-nunit.exe", "lib/netcoreapp1.0/dotnet-test-nunit.dll", @@ -3512,20 +3508,18 @@ "lib/netstandard1.0/NuGet.Versioning.xml" ] }, - "NUnit/3.6.0": { - "sha512": "mqJP3DI3mE9YSx0o7R5loyIm0NnLJLSl2lmXn84gSkOmLWzlbrxS0oW/cCZF25tB+6Anday/mbkRUXmdvEvSCg==", + "NUnit/3.5.0": { + "sha512": "z/03NfC+ci3/OzlIlClH+nd0LZ99OU3wS74nAtLVnPo1oq6nvd+CGxTHZAc8n8xxDLeLMofZ8AjahxlROEhf/Q==", "type": "package", - "path": "NUnit/3.6.0", + "path": "NUnit/3.5.0", "files": [ "CHANGES.txt", "LICENSE.txt", "NOTICES.txt", - "NUnit.3.6.0.nupkg.sha512", + "NUnit.3.5.0.nupkg.sha512", "NUnit.nuspec", - "lib/MonoAndroid/nunit.framework.dll", - "lib/MonoAndroid/nunit.framework.xml", - "lib/Xamarin.iOS10/nunit.framework.dll", - "lib/Xamarin.iOS10/nunit.framework.xml", + "lib/dotnet/nunit.framework.dll", + "lib/dotnet/nunit.framework.xml", "lib/net20/NUnit.System.Linq.dll", "lib/net20/nunit.framework.dll", "lib/net20/nunit.framework.xml", @@ -3535,18 +3529,16 @@ "lib/net40/nunit.framework.xml", "lib/net45/nunit.framework.dll", "lib/net45/nunit.framework.xml", - "lib/netstandard1.6/nunit.framework.dll", - "lib/netstandard1.6/nunit.framework.xml", - "lib/portable-net45+win8+wp8+wpa81/nunit.framework.dll", - "lib/portable-net45+win8+wp8+wpa81/nunit.framework.xml" + "lib/portable-net45+win8+wp8+wpa81+Xamarin.Mac+MonoAndroid10+MonoTouch10+Xamarin.iOS10/nunit.framework.dll", + "lib/portable-net45+win8+wp8+wpa81+Xamarin.Mac+MonoAndroid10+MonoTouch10+Xamarin.iOS10/nunit.framework.xml" ] }, - "NUnit.Portable.Agent/3.4.0-beta-3": { - "sha512": "BcgxOFtivkK8+eu5krwxxTWw6RtASfp7q/nQ18Z5a2rEvDHZz9h+U2fJjIrw4P9CtqbzZwNuGHprUnhoVqlZrw==", + "NUnit.Portable.Agent/3.4.0-beta-2": { + "sha512": "91XarG692I/gCwDi3d1k5tRc1J4mPo6l6nHrYRKWSgc9M2a0SX1YNS/uJwth3bXQEn+499mdZ02NTuvJstRq1A==", "type": "package", - "path": "NUnit.Portable.Agent/3.4.0-beta-3", + "path": "NUnit.Portable.Agent/3.4.0-beta-2", "files": [ - "NUnit.Portable.Agent.3.4.0-beta-3.nupkg.sha512", + "NUnit.Portable.Agent.3.4.0-beta-2.nupkg.sha512", "NUnit.Portable.Agent.nuspec", "lib/net45/NUnit.Portable.Agent.dll", "lib/netstandard1.3/NUnit.Portable.Agent.dll" @@ -8331,12 +8323,12 @@ "KafkaClient", "KafkaClient.Testing", "NSubstitute >= 2.0.0-rc", - "NUnit >= 3.6.0", + "NUnit >= 3.5.0", "Nito.AsyncEx.Coordination >= 1.0.2", "System.Collections.Immutable >= 1.3.1", "System.Net.Sockets >= 4.3.0", "System.Threading.Tasks.Parallel >= 4.3.0", - "dotnet-test-nunit >= 3.4.0-beta-3" + "dotnet-test-nunit >= 3.4.0-beta-2" ], ".NETCoreApp,Version=v1.1": [ "Microsoft.NETCore.App >= 1.1.0-*", diff --git a/src/KafkaClient/Common/Extensions.cs b/src/KafkaClient/Common/Extensions.cs index fce834a2..f3bc2e60 100644 --- a/src/KafkaClient/Common/Extensions.cs +++ b/src/KafkaClient/Common/Extensions.cs @@ -435,15 +435,17 @@ public static async Task ThrowIfCancellationRequested(this Task task, C return await task.ConfigureAwait(false); } - public static async Task IsCancelled(this Task task, CancellationToken cancellationToken) + public static async Task ThrowIfCancellationRequested(this Task task, CancellationToken cancellationToken) { var tcs = new TaskCompletionSource(); - using (cancellationToken.Register(_ => ((TaskCompletionSource)_).TrySetResult(true), tcs)) { - if (task != await Task.WhenAny(task, tcs.Task).ConfigureAwait(false)) { - return true; + using (cancellationToken.Register(_ => ((TaskCompletionSource)_).TrySetResult(true), tcs)) + { + if (task != await Task.WhenAny(task, tcs.Task).ConfigureAwait(false)) + { + throw new OperationCanceledException(cancellationToken); } } - return false; + await task.ConfigureAwait(false); } #endregion diff --git a/src/KafkaClient/Connections/ReconnectingSocket.cs b/src/KafkaClient/Connections/ReconnectingSocket.cs index 80f568f1..dacfda5b 100644 --- a/src/KafkaClient/Connections/ReconnectingSocket.cs +++ b/src/KafkaClient/Connections/ReconnectingSocket.cs @@ -14,6 +14,7 @@ internal class ReconnectingSocket : IDisposable private readonly IConnectionConfiguration _configuration; private int _disposeCount; // = 0; + private readonly CancellationTokenSource _disposeToken = new CancellationTokenSource(); private Socket _socket; private readonly SemaphoreSlim _connectSemaphore = new SemaphoreSlim(1, 1); @@ -68,55 +69,60 @@ public async Task ConnectAsync(CancellationToken cancellationToken) if (_disposeCount > 0) throw new ObjectDisposedException(nameof(ReconnectingSocket)); if (_socket?.Connected ?? cancellationToken.IsCancellationRequested) return _socket; - return await _connectSemaphore.LockAsync( - async () => { - if (_socket?.Connected ?? cancellationToken.IsCancellationRequested) return _socket; - var socket = _socket ?? CreateSocket(); - _socket = await _configuration.ConnectionRetry.TryAsync( - //action - async (attempt, timer) => { - if (cancellationToken.IsCancellationRequested) return RetryAttempt.Abort; - - _log.Info(() => LogEvent.Create($"Connecting to {_endpoint}")); - _configuration.OnConnecting?.Invoke(_endpoint, attempt, timer.Elapsed); - - await socket.ConnectAsync(_endpoint.Ip.Address, _endpoint.Ip.Port).ConfigureAwait(false); - if (!socket.Connected) return RetryAttempt.Retry; - - _log.Info(() => LogEvent.Create($"Connection established to {_endpoint}")); - _configuration.OnConnected?.Invoke(_endpoint, attempt, timer.Elapsed); - return new RetryAttempt(socket); - }, - (attempt, retry) => _log.Warn(() =>LogEvent.Create($"Failed connection to {_endpoint}: Will retry in {retry}")), - attempt => { - _log.Warn(() => LogEvent.Create($"Failed connection to {_endpoint} on attempt {attempt}")); - throw new ConnectionException(_endpoint); - }, - (ex, attempt, retry) => { - if (_disposeCount > 0) throw new ObjectDisposedException(nameof(ReconnectingSocket), ex); - _log.Warn(() => LogEvent.Create(ex, $"Failed connection to {_endpoint}: Will retry in {retry}")); - - if (ex is ObjectDisposedException || ex is PlatformNotSupportedException) { - Disconnect(); - _log.Info(() => LogEvent.Create($"Creating new socket to {_endpoint}")); - socket = CreateSocket(); - } - }, - (ex, attempt) => { - _log.Warn(() => LogEvent.Create(ex, $"Failed connection to {_endpoint} on attempt {attempt}")); - if (ex is SocketException || ex is PlatformNotSupportedException) { - throw new ConnectionException(_endpoint, ex); - } - }, - cancellationToken).ConfigureAwait(false); - return _socket; - }, cancellationToken).ConfigureAwait(false); + using (var cancellation = CancellationTokenSource.CreateLinkedTokenSource(_disposeToken.Token, cancellationToken)) { + return await _connectSemaphore.LockAsync( + async () => { + if (_socket?.Connected ?? cancellation.Token.IsCancellationRequested) return _socket; + var socket = _socket ?? CreateSocket(); + _socket = await _configuration.ConnectionRetry.TryAsync( + //action + async (attempt, timer) => { + if (cancellation.Token.IsCancellationRequested) return RetryAttempt.Abort; + + _log.Info(() => LogEvent.Create($"Connecting to {_endpoint}")); + _configuration.OnConnecting?.Invoke(_endpoint, attempt, timer.Elapsed); + + await socket.ConnectAsync(_endpoint.Ip.Address, _endpoint.Ip.Port).ThrowIfCancellationRequested(cancellation.Token).ConfigureAwait(false); + if (!socket.Connected) return RetryAttempt.Retry; + + _log.Info(() => LogEvent.Create($"Connection established to {_endpoint}")); + _configuration.OnConnected?.Invoke(_endpoint, attempt, timer.Elapsed); + return new RetryAttempt(socket); + }, + (attempt, retry) => _log.Warn(() => LogEvent.Create($"Failed connection to {_endpoint}: Will retry in {retry}")), + attempt => { + _log.Warn(() => LogEvent.Create($"Failed connection to {_endpoint} on attempt {attempt}")); + throw new ConnectionException(_endpoint); + }, + (ex, attempt, retry) => { + if (_disposeCount > 0) throw new ObjectDisposedException(nameof(ReconnectingSocket), ex); + _log.Warn(() => LogEvent.Create(ex, $"Failed connection to {_endpoint}: Will retry in {retry}")); + + if (ex is ObjectDisposedException || ex is PlatformNotSupportedException) + { + Disconnect(); + _log.Info(() => LogEvent.Create($"Creating new socket to {_endpoint}")); + socket = CreateSocket(); + } + }, + (ex, attempt) => { + _log.Warn(() => LogEvent.Create(ex, $"Failed connection to {_endpoint} on attempt {attempt}")); + if (ex is SocketException || ex is PlatformNotSupportedException) + { + throw new ConnectionException(_endpoint, ex); + } + }, + cancellation.Token).ConfigureAwait(false); + return _socket; + }, cancellation.Token).ConfigureAwait(false); + } } public void Dispose() { if (Interlocked.Increment(ref _disposeCount) != 1) return; + _disposeToken.Cancel(); _connectSemaphore.Dispose(); Disconnect(); }