Skip to content

Commit

Permalink
more robust testing of Connection #18
Browse files Browse the repository at this point in the history
  • Loading branch information
awr committed Feb 2, 2017
1 parent 73d6f58 commit 75199f3
Show file tree
Hide file tree
Showing 24 changed files with 1,251 additions and 1,092 deletions.
7 changes: 3 additions & 4 deletions src/KafkaClient.Performance/ProduceRequestBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@ public void SetupData()
var port = 10000;
var endpoint = new Endpoint(new IPEndPoint(IPAddress.Loopback, port), "localhost");
_server = new TcpServer(endpoint.Ip.Port) {
OnBytesReceived = b =>
{
var header = KafkaDecoder.DecodeHeader(b.Skip(4));
OnReceivedAsync = async data => {
var header = KafkaDecoder.DecodeHeader(data.Skip(4));
var bytes = KafkaDecoder.EncodeResponseBytes(new RequestContext(header.CorrelationId), response);
AsyncContext.Run(async () => await _server.SendDataAsync(bytes));
await _server.SendDataAsync(bytes);
}
};
_connection = new Connection(endpoint);
Expand Down
22 changes: 19 additions & 3 deletions src/KafkaClient.Testing/KafkaDecoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ public static IRequestContext DecodeHeader(ArraySegment<byte> bytes)
}
}

public static Tuple<IRequestContext, ApiKey> DecodeFullHeader(ArraySegment<byte> bytes)
{
ApiKey apiKey;
IRequestContext context;
using (ReadHeader(bytes, out apiKey, out context))
{
return new Tuple<IRequestContext, ApiKey>(context, apiKey);
}
}

public static T Decode<T>(ArraySegment<byte> bytes, IRequestContext context = null) where T : class, IRequest
{
var protocolType = context?.ProtocolType;
Expand Down Expand Up @@ -403,23 +413,29 @@ private static IKafkaReader ReadHeader(ArraySegment<byte> data)
}

private static IKafkaReader ReadHeader(ArraySegment<byte> data, out IRequestContext context)
{
ApiKey apikey;
return ReadHeader(data, out apikey, out context);
}

private static IKafkaReader ReadHeader(ArraySegment<byte> data, out ApiKey apiKey, out IRequestContext context)
{
var reader = new KafkaReader(data);
try {
// ReSharper disable once UnusedVariable
var apiKey = (ApiKey) reader.ReadInt16();
apiKey = (ApiKey)reader.ReadInt16();
var version = reader.ReadInt16();
var correlationId = reader.ReadInt32();
var clientId = reader.ReadString();

context = new RequestContext(correlationId, version, clientId);
} catch {
apiKey = 0;
context = null;
reader.Dispose();
reader = null;
}
return reader;
}
}

#endregion

Expand Down
16 changes: 8 additions & 8 deletions src/KafkaClient.Testing/TcpServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace KafkaClient.Testing
{
public class TcpServer : IDisposable
{
public Action<ArraySegment<byte>> OnBytesReceived { get; set; }
public Func<ArraySegment<byte>, Task> OnReceivedAsync { get; set; }
public Action OnConnected { get; set; }
public Action OnDisconnected { get; set; }

Expand All @@ -34,7 +34,7 @@ public async Task<bool> SendDataAsync(ArraySegment<byte> data)
{
try {
await _clientSemaphore.WaitAsync();
_log.Debug(() => LogEvent.Create($"FAKE Server: writing {data.Count} bytes."));
_log.Debug(() => LogEvent.Create($"SERVER: writing {data.Count} bytes."));
await _client.GetStream().WriteAsync(data.Array, data.Offset, data.Count).ConfigureAwait(false);
return true;
} catch (Exception ex) {
Expand All @@ -58,9 +58,9 @@ private async Task ClientConnectAsync()
{
var buffer = new byte[8192];
while (!_disposeToken.IsCancellationRequested) {
_log.Debug(() => LogEvent.Create("Server: Accepting clients."));
_log.Debug(() => LogEvent.Create("SERVER: Accepting clients."));
_client = await _listener.AcceptTcpClientAsync().ConfigureAwait(false);
_log.Debug(() => LogEvent.Create("Server: Connected client"));
_log.Debug(() => LogEvent.Create("SERVER: Connected client"));

_connectedTrigger.TrySetResult(true);
OnConnected?.Invoke();
Expand All @@ -72,19 +72,19 @@ private async Task ClientConnectAsync()
var read = await stream.ReadAsync(buffer, 0, buffer.Length, _disposeToken.Token)
.ThrowIfCancellationRequested(_disposeToken.Token)
.ConfigureAwait(false);
if (read > 0) {
OnBytesReceived?.Invoke(new ArraySegment<byte>(buffer, 0, read));
if (read > 0 && OnReceivedAsync != null) {
await OnReceivedAsync(new ArraySegment<byte>(buffer, 0, read));
}
}
} catch (Exception ex) {
if (!(ex is OperationCanceledException)) {
_log.Debug(() => LogEvent.Create(ex, "Server: client failed"));
_log.Debug(() => LogEvent.Create(ex, "SERVER: client failed"));
}
} finally {
_client?.Dispose();
}

_log.Debug(() => LogEvent.Create("Server: Client Disconnected."));
_log.Debug(() => LogEvent.Create("SERVER: Client Disconnected."));
await _clientSemaphore.WaitAsync(_disposeToken.Token); //remove the one client
_connectedTrigger = new TaskCompletionSource<bool>();
OnDisconnected?.Invoke();
Expand Down
1 change: 0 additions & 1 deletion src/KafkaClient.Tests/ByteMembershipEncoder.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using KafkaClient.Assignment;
using KafkaClient.Common;
using KafkaClient.Protocol;

namespace KafkaClient.Tests
Expand Down
1 change: 0 additions & 1 deletion src/KafkaClient.Tests/ConsoleLog.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Immutable;
using KafkaClient.Common;
using NUnit.Framework;

namespace KafkaClient.Tests
{
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaClient.Tests/FakeRouter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public IRouter Create(TimeSpan? cacheExpiration = null)
return new Router(
new [] { new Endpoint(new IPEndPoint(IPAddress.Loopback, 1)), new Endpoint(new IPEndPoint(IPAddress.Loopback, 2)) },
KafkaConnectionFactory,
routerConfiguration: new RouterConfiguration(cacheExpiration: cacheExpiration.GetValueOrDefault(TimeSpan.FromMilliseconds(1))),
routerConfiguration: new RouterConfiguration(cacheExpiration: cacheExpiration.GetValueOrDefault(TimeSpan.FromMinutes(1))),
log: TestConfig.Log);
}

Expand Down
60 changes: 28 additions & 32 deletions src/KafkaClient.Tests/Integration/ConnectionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,44 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using KafkaClient.Connections;
using KafkaClient.Common;
using KafkaClient.Protocol;
using Nito.AsyncEx;
using NUnit.Framework;

namespace KafkaClient.Tests.Integration
{
[TestFixture]
public class ConnectionTests
{
private Connection _conn;

[SetUp]
public void Setup()
{
var options = new KafkaOptions(TestConfig.IntegrationUri, new ConnectionConfiguration(versionSupport: VersionSupport.Kafka9.Dynamic()), log: TestConfig.Log);
var endpoint = AsyncContext.Run(() => Endpoint.ResolveAsync(options.ServerUris.First(), options.Log));

_conn = new Connection(endpoint, options.ConnectionConfiguration, TestConfig.Log);
}

[Test]
public async Task EnsureTwoRequestsCanCallOneAfterAnother()
{
var result1 = await _conn.SendAsync(new MetadataRequest(), CancellationToken.None);
var result2 = await _conn.SendAsync(new MetadataRequest(), CancellationToken.None);
Assert.That(result1.Errors.Count(code => code != ErrorCode.None), Is.EqualTo(0));
Assert.That(result2.Errors.Count(code => code != ErrorCode.None), Is.EqualTo(0));
await Async.Using(
await TestConfig.Options.CreateConnectionAsync(),
async connection => {
var result1 = await connection.SendAsync(new MetadataRequest(), CancellationToken.None);
var result2 = await connection.SendAsync(new MetadataRequest(), CancellationToken.None);
Assert.That(result1.Errors.Count(code => code != ErrorCode.None), Is.EqualTo(0));
Assert.That(result2.Errors.Count(code => code != ErrorCode.None), Is.EqualTo(0));
});
}

[Test]
public async Task EnsureAsyncRequestResponsesCorrelate()
{
var result1 = _conn.SendAsync(new MetadataRequest(), CancellationToken.None);
var result2 = _conn.SendAsync(new MetadataRequest(), CancellationToken.None);
var result3 = _conn.SendAsync(new MetadataRequest(), CancellationToken.None);

await Task.WhenAll(result1, result2, result3);

Assert.That(result1.Result.Errors.Count(code => code != ErrorCode.None), Is.EqualTo(0));
Assert.That(result2.Result.Errors.Count(code => code != ErrorCode.None), Is.EqualTo(0));
Assert.That(result3.Result.Errors.Count(code => code != ErrorCode.None), Is.EqualTo(0));
await Async.Using(
await TestConfig.Options.CreateConnectionAsync(),
async connection => {
var result1 = connection.SendAsync(new MetadataRequest(), CancellationToken.None);
var result2 = connection.SendAsync(new MetadataRequest(), CancellationToken.None);
var result3 = connection.SendAsync(new MetadataRequest(), CancellationToken.None);

await Task.WhenAll(result1, result2, result3);

Assert.That(result1.Result.Errors.Count(code => code != ErrorCode.None), Is.EqualTo(0));
Assert.That(result2.Result.Errors.Count(code => code != ErrorCode.None), Is.EqualTo(0));
Assert.That(result3.Result.Errors.Count(code => code != ErrorCode.None), Is.EqualTo(0));
});
}

[Test]
Expand All @@ -54,7 +50,7 @@ public async Task EnsureMultipleAsyncRequestsCanReadResponses([Values(1, 5)] int
var requestTasks = new ConcurrentBag<Task<MetadataResponse>>();
using (var router = await TestConfig.Options.CreateRouterAsync()) {
await router.TemporaryTopicAsync(async topicName => {
var singleResult = await _conn.SendAsync(new MetadataRequest(TestConfig.TopicName()), CancellationToken.None);
var singleResult = await router.Connections.First().SendAsync(new MetadataRequest(TestConfig.TopicName()), CancellationToken.None);
Assert.That(singleResult.Topics.Count, Is.GreaterThan(0));
Assert.That(singleResult.Topics.First().Partitions.Count, Is.GreaterThan(0));

Expand All @@ -64,7 +60,7 @@ await router.TemporaryTopicAsync(async topicName => {
while (true) {
await Task.Delay(1);
if (Interlocked.Increment(ref requestsSoFar) > totalRequests) break;
requestTasks.Add(_conn.SendAsync(new MetadataRequest(), CancellationToken.None));
requestTasks.Add(router.Connections.First().SendAsync(new MetadataRequest(), CancellationToken.None));
}
}));
}
Expand All @@ -85,10 +81,10 @@ public async Task EnsureDifferentTypesOfResponsesCanBeReadAsync()
using (var router = await TestConfig.Options.CreateRouterAsync()) {
await router.TemporaryTopicAsync(
async topicName => {
var result1 = _conn.SendAsync(RequestFactory.CreateProduceRequest(topicName, "test"), CancellationToken.None);
var result2 = _conn.SendAsync(new MetadataRequest(topicName), CancellationToken.None);
var result3 = _conn.SendAsync(RequestFactory.CreateOffsetRequest(topicName), CancellationToken.None);
var result4 = _conn.SendAsync(RequestFactory.CreateFetchRequest(topicName, 0), CancellationToken.None);
var result1 = router.Connections.First().SendAsync(RequestFactory.CreateProduceRequest(topicName, "test"), CancellationToken.None);
var result2 = router.Connections.First().SendAsync(new MetadataRequest(topicName), CancellationToken.None);
var result3 = router.Connections.First().SendAsync(RequestFactory.CreateOffsetRequest(topicName), CancellationToken.None);
var result4 = router.Connections.First().SendAsync(RequestFactory.CreateFetchRequest(topicName, 0), CancellationToken.None);

await Task.WhenAll(result1, result2, result3, result4);

Expand Down
6 changes: 3 additions & 3 deletions src/KafkaClient.Tests/TestConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ public static string GroupId([CallerMemberName] string name = null)
}

// turned down to reduce log noise -- turn up Level if necessary
public static ILog Log = new ConsoleLog();
public static readonly ILog Log = new ConsoleLog();

public static Uri ServerUri()
public static Endpoint ServerEndpoint()
{
return new Uri($"http://localhost:{ServerPort()}");
return new Endpoint(new IPEndPoint(IPAddress.Loopback, ServerPort()), "localhost");
}

public static int ServerPort()
Expand Down
9 changes: 9 additions & 0 deletions src/KafkaClient.Tests/Unit/AssignmentTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ public async Task AssigmentSucceedsWhenStrategyExists()
}
}

[Test]
public void InterfacesAreFormattedWithinProtocol()
{
var request = new SyncGroupRequest("group", 5, "member", new[] { new SyncGroupRequest.GroupAssignment("member", new ConsumerMemberAssignment(new[] { new TopicPartition("topic", 0), new TopicPartition("topic", 1) })) });
var formatted = request.ToString();
Assert.That(formatted.Contains("TopicName:topic"));
Assert.That(formatted.Contains("PartitionId:1"));
}

// design unit TESTS to write:
// assignment priority is given to first assignor if multiple available
// non-leader calls to get assignment data
Expand Down
Loading

0 comments on commit 75199f3

Please sign in to comment.