Skip to content

Commit

Permalink
Fix transient test #18
Browse files Browse the repository at this point in the history
  • Loading branch information
awr committed Feb 2, 2017
1 parent 75199f3 commit 273f708
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 87 deletions.
10 changes: 5 additions & 5 deletions src/KafkaClient.Tests/Unit/TransportTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,12 @@ public async Task ReadShouldThrowServerDisconnectedExceptionWhenDisconnected()
[Test]
public async Task WhenNoConnectionThrowSocketExceptionAfterMaxRetry()
{
var reconnectionAttempt = 0;
const int maxAttempts = 3;
var connectionAttempts = 0;
const int maxRetries = 2;
var endpoint = TestConfig.ServerEndpoint();
var config = new ConnectionConfiguration(
Retry.AtMost(maxAttempts),
onConnecting: (e, attempt, elapsed) => Interlocked.Increment(ref reconnectionAttempt)
Retry.AtMost(maxRetries),
onConnecting: (e, attempt, elapsed) => Interlocked.Increment(ref connectionAttempts)
);
using (var transport = new SocketTransport(endpoint, config, TestConfig.Log)) {
try {
Expand All @@ -269,7 +269,7 @@ public async Task WhenNoConnectionThrowSocketExceptionAfterMaxRetry()
} catch (ConnectionException) {
// expected
}
Assert.That(reconnectionAttempt, Is.EqualTo(maxAttempts + 1));
Assert.That(connectionAttempts, Is.EqualTo(1 + maxRetries));
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/KafkaClient.Tests/project.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
"Nito.AsyncEx.Coordination": "1.0.2",

"NSubstitute": "2.0.0-rc",
"NUnit": "3.6.0",
"NUnit": "3.5.0",
"System.Threading.Tasks.Parallel": "4.3.0",
"dotnet-test-nunit": "3.4.0-beta-3"
"dotnet-test-nunit": "3.4.0-beta-2"
},
"testRunner": "nunit",

Expand Down
56 changes: 24 additions & 32 deletions src/KafkaClient.Tests/project.lock.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@
"lib/netstandard1.3/Castle.Core.dll": {}
}
},
"dotnet-test-nunit/3.4.0-beta-3": {
"dotnet-test-nunit/3.4.0-beta-2": {
"type": "package",
"dependencies": {
"Microsoft.Extensions.DependencyModel": "1.0.0",
"Microsoft.Extensions.Testing.Abstractions": "1.0.0-preview2-003121",
"Microsoft.NETCore.App": "1.0.0",
"NUnit.Portable.Agent": "3.4.0-beta-3"
"NUnit.Portable.Agent": "3.4.0-beta-2"
},
"compile": {
"lib/netcoreapp1.0/dotnet-test-nunit.dll": {}
Expand Down Expand Up @@ -730,20 +730,16 @@
"lib/netstandard1.0/NuGet.Versioning.dll": {}
}
},
"NUnit/3.6.0": {
"NUnit/3.5.0": {
"type": "package",
"dependencies": {
"NETStandard.Library": "1.6.0",
"System.Runtime.Loader": "4.0.0"
},
"compile": {
"lib/netstandard1.6/nunit.framework.dll": {}
"lib/dotnet/nunit.framework.dll": {}
},
"runtime": {
"lib/netstandard1.6/nunit.framework.dll": {}
"lib/dotnet/nunit.framework.dll": {}
}
},
"NUnit.Portable.Agent/3.4.0-beta-3": {
"NUnit.Portable.Agent/3.4.0-beta-2": {
"type": "package",
"dependencies": {
"System.Globalization": "4.0.11",
Expand Down Expand Up @@ -2862,12 +2858,12 @@
"readme.txt"
]
},
"dotnet-test-nunit/3.4.0-beta-3": {
"sha512": "TCprFJd60JQrNSygKAinVFBF2zQK3V2BRm1jeBvAGKF/6omeMrjO1tBD3UKlZSGvHFNyiocPn7bgU68cNz94ug==",
"dotnet-test-nunit/3.4.0-beta-2": {
"sha512": "ImRqxsZMFzn+bM33emu5ZMzIUe8GOHPxdA2NkUIsawWd4YjvcFzdIo5iNuejlY0lw1M+RMwRbpd/cKkhKft52g==",
"type": "package",
"path": "dotnet-test-nunit/3.4.0-beta-3",
"path": "dotnet-test-nunit/3.4.0-beta-2",
"files": [
"dotnet-test-nunit.3.4.0-beta-3.nupkg.sha512",
"dotnet-test-nunit.3.4.0-beta-2.nupkg.sha512",
"dotnet-test-nunit.nuspec",
"lib/net451/dotnet-test-nunit.exe",
"lib/netcoreapp1.0/dotnet-test-nunit.dll",
Expand Down Expand Up @@ -3512,20 +3508,18 @@
"lib/netstandard1.0/NuGet.Versioning.xml"
]
},
"NUnit/3.6.0": {
"sha512": "mqJP3DI3mE9YSx0o7R5loyIm0NnLJLSl2lmXn84gSkOmLWzlbrxS0oW/cCZF25tB+6Anday/mbkRUXmdvEvSCg==",
"NUnit/3.5.0": {
"sha512": "z/03NfC+ci3/OzlIlClH+nd0LZ99OU3wS74nAtLVnPo1oq6nvd+CGxTHZAc8n8xxDLeLMofZ8AjahxlROEhf/Q==",
"type": "package",
"path": "NUnit/3.6.0",
"path": "NUnit/3.5.0",
"files": [
"CHANGES.txt",
"LICENSE.txt",
"NOTICES.txt",
"NUnit.3.6.0.nupkg.sha512",
"NUnit.3.5.0.nupkg.sha512",
"NUnit.nuspec",
"lib/MonoAndroid/nunit.framework.dll",
"lib/MonoAndroid/nunit.framework.xml",
"lib/Xamarin.iOS10/nunit.framework.dll",
"lib/Xamarin.iOS10/nunit.framework.xml",
"lib/dotnet/nunit.framework.dll",
"lib/dotnet/nunit.framework.xml",
"lib/net20/NUnit.System.Linq.dll",
"lib/net20/nunit.framework.dll",
"lib/net20/nunit.framework.xml",
Expand All @@ -3535,18 +3529,16 @@
"lib/net40/nunit.framework.xml",
"lib/net45/nunit.framework.dll",
"lib/net45/nunit.framework.xml",
"lib/netstandard1.6/nunit.framework.dll",
"lib/netstandard1.6/nunit.framework.xml",
"lib/portable-net45+win8+wp8+wpa81/nunit.framework.dll",
"lib/portable-net45+win8+wp8+wpa81/nunit.framework.xml"
"lib/portable-net45+win8+wp8+wpa81+Xamarin.Mac+MonoAndroid10+MonoTouch10+Xamarin.iOS10/nunit.framework.dll",
"lib/portable-net45+win8+wp8+wpa81+Xamarin.Mac+MonoAndroid10+MonoTouch10+Xamarin.iOS10/nunit.framework.xml"
]
},
"NUnit.Portable.Agent/3.4.0-beta-3": {
"sha512": "BcgxOFtivkK8+eu5krwxxTWw6RtASfp7q/nQ18Z5a2rEvDHZz9h+U2fJjIrw4P9CtqbzZwNuGHprUnhoVqlZrw==",
"NUnit.Portable.Agent/3.4.0-beta-2": {
"sha512": "91XarG692I/gCwDi3d1k5tRc1J4mPo6l6nHrYRKWSgc9M2a0SX1YNS/uJwth3bXQEn+499mdZ02NTuvJstRq1A==",
"type": "package",
"path": "NUnit.Portable.Agent/3.4.0-beta-3",
"path": "NUnit.Portable.Agent/3.4.0-beta-2",
"files": [
"NUnit.Portable.Agent.3.4.0-beta-3.nupkg.sha512",
"NUnit.Portable.Agent.3.4.0-beta-2.nupkg.sha512",
"NUnit.Portable.Agent.nuspec",
"lib/net45/NUnit.Portable.Agent.dll",
"lib/netstandard1.3/NUnit.Portable.Agent.dll"
Expand Down Expand Up @@ -8331,12 +8323,12 @@
"KafkaClient",
"KafkaClient.Testing",
"NSubstitute >= 2.0.0-rc",
"NUnit >= 3.6.0",
"NUnit >= 3.5.0",
"Nito.AsyncEx.Coordination >= 1.0.2",
"System.Collections.Immutable >= 1.3.1",
"System.Net.Sockets >= 4.3.0",
"System.Threading.Tasks.Parallel >= 4.3.0",
"dotnet-test-nunit >= 3.4.0-beta-3"
"dotnet-test-nunit >= 3.4.0-beta-2"
],
".NETCoreApp,Version=v1.1": [
"Microsoft.NETCore.App >= 1.1.0-*",
Expand Down
12 changes: 7 additions & 5 deletions src/KafkaClient/Common/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -435,15 +435,17 @@ public static async Task<T> ThrowIfCancellationRequested<T>(this Task<T> task, C
return await task.ConfigureAwait(false);
}

public static async Task<bool> IsCancelled(this Task task, CancellationToken cancellationToken)
public static async Task ThrowIfCancellationRequested(this Task task, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<bool>();
using (cancellationToken.Register(_ => ((TaskCompletionSource<bool>)_).TrySetResult(true), tcs)) {
if (task != await Task.WhenAny(task, tcs.Task).ConfigureAwait(false)) {
return true;
using (cancellationToken.Register(_ => ((TaskCompletionSource<bool>)_).TrySetResult(true), tcs))
{
if (task != await Task.WhenAny(task, tcs.Task).ConfigureAwait(false))
{
throw new OperationCanceledException(cancellationToken);
}
}
return false;
await task.ConfigureAwait(false);
}

#endregion
Expand Down
92 changes: 49 additions & 43 deletions src/KafkaClient/Connections/ReconnectingSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ internal class ReconnectingSocket : IDisposable
private readonly IConnectionConfiguration _configuration;

private int _disposeCount; // = 0;
private readonly CancellationTokenSource _disposeToken = new CancellationTokenSource();

private Socket _socket;
private readonly SemaphoreSlim _connectSemaphore = new SemaphoreSlim(1, 1);
Expand Down Expand Up @@ -68,55 +69,60 @@ public async Task<Socket> ConnectAsync(CancellationToken cancellationToken)
if (_disposeCount > 0) throw new ObjectDisposedException(nameof(ReconnectingSocket));
if (_socket?.Connected ?? cancellationToken.IsCancellationRequested) return _socket;

return await _connectSemaphore.LockAsync(
async () => {
if (_socket?.Connected ?? cancellationToken.IsCancellationRequested) return _socket;
var socket = _socket ?? CreateSocket();
_socket = await _configuration.ConnectionRetry.TryAsync(
//action
async (attempt, timer) => {
if (cancellationToken.IsCancellationRequested) return RetryAttempt<Socket>.Abort;

_log.Info(() => LogEvent.Create($"Connecting to {_endpoint}"));
_configuration.OnConnecting?.Invoke(_endpoint, attempt, timer.Elapsed);

await socket.ConnectAsync(_endpoint.Ip.Address, _endpoint.Ip.Port).ConfigureAwait(false);
if (!socket.Connected) return RetryAttempt<Socket>.Retry;

_log.Info(() => LogEvent.Create($"Connection established to {_endpoint}"));
_configuration.OnConnected?.Invoke(_endpoint, attempt, timer.Elapsed);
return new RetryAttempt<Socket>(socket);
},
(attempt, retry) => _log.Warn(() =>LogEvent.Create($"Failed connection to {_endpoint}: Will retry in {retry}")),
attempt => {
_log.Warn(() => LogEvent.Create($"Failed connection to {_endpoint} on attempt {attempt}"));
throw new ConnectionException(_endpoint);
},
(ex, attempt, retry) => {
if (_disposeCount > 0) throw new ObjectDisposedException(nameof(ReconnectingSocket), ex);
_log.Warn(() => LogEvent.Create(ex, $"Failed connection to {_endpoint}: Will retry in {retry}"));

if (ex is ObjectDisposedException || ex is PlatformNotSupportedException) {
Disconnect();
_log.Info(() => LogEvent.Create($"Creating new socket to {_endpoint}"));
socket = CreateSocket();
}
},
(ex, attempt) => {
_log.Warn(() => LogEvent.Create(ex, $"Failed connection to {_endpoint} on attempt {attempt}"));
if (ex is SocketException || ex is PlatformNotSupportedException) {
throw new ConnectionException(_endpoint, ex);
}
},
cancellationToken).ConfigureAwait(false);
return _socket;
}, cancellationToken).ConfigureAwait(false);
using (var cancellation = CancellationTokenSource.CreateLinkedTokenSource(_disposeToken.Token, cancellationToken)) {
return await _connectSemaphore.LockAsync(
async () => {
if (_socket?.Connected ?? cancellation.Token.IsCancellationRequested) return _socket;
var socket = _socket ?? CreateSocket();
_socket = await _configuration.ConnectionRetry.TryAsync(
//action
async (attempt, timer) => {
if (cancellation.Token.IsCancellationRequested) return RetryAttempt<Socket>.Abort;

_log.Info(() => LogEvent.Create($"Connecting to {_endpoint}"));
_configuration.OnConnecting?.Invoke(_endpoint, attempt, timer.Elapsed);

await socket.ConnectAsync(_endpoint.Ip.Address, _endpoint.Ip.Port).ThrowIfCancellationRequested(cancellation.Token).ConfigureAwait(false);
if (!socket.Connected) return RetryAttempt<Socket>.Retry;

_log.Info(() => LogEvent.Create($"Connection established to {_endpoint}"));
_configuration.OnConnected?.Invoke(_endpoint, attempt, timer.Elapsed);
return new RetryAttempt<Socket>(socket);
},
(attempt, retry) => _log.Warn(() => LogEvent.Create($"Failed connection to {_endpoint}: Will retry in {retry}")),
attempt => {
_log.Warn(() => LogEvent.Create($"Failed connection to {_endpoint} on attempt {attempt}"));
throw new ConnectionException(_endpoint);
},
(ex, attempt, retry) => {
if (_disposeCount > 0) throw new ObjectDisposedException(nameof(ReconnectingSocket), ex);
_log.Warn(() => LogEvent.Create(ex, $"Failed connection to {_endpoint}: Will retry in {retry}"));

if (ex is ObjectDisposedException || ex is PlatformNotSupportedException)
{
Disconnect();
_log.Info(() => LogEvent.Create($"Creating new socket to {_endpoint}"));
socket = CreateSocket();
}
},
(ex, attempt) => {
_log.Warn(() => LogEvent.Create(ex, $"Failed connection to {_endpoint} on attempt {attempt}"));
if (ex is SocketException || ex is PlatformNotSupportedException)
{
throw new ConnectionException(_endpoint, ex);
}
},
cancellation.Token).ConfigureAwait(false);
return _socket;
}, cancellation.Token).ConfigureAwait(false);
}
}

public void Dispose()
{
if (Interlocked.Increment(ref _disposeCount) != 1) return;

_disposeToken.Cancel();
_connectSemaphore.Dispose();
Disconnect();
}
Expand Down

0 comments on commit 273f708

Please sign in to comment.