diff --git a/src/KafkaClient.Performance/ProduceRequestBenchmark.cs b/src/KafkaClient.Performance/ProduceRequestBenchmark.cs index 4da9194d..d59dc539 100644 --- a/src/KafkaClient.Performance/ProduceRequestBenchmark.cs +++ b/src/KafkaClient.Performance/ProduceRequestBenchmark.cs @@ -56,11 +56,10 @@ public void SetupData() var port = 10000; var endpoint = new Endpoint(new IPEndPoint(IPAddress.Loopback, port), "localhost"); _server = new TcpServer(endpoint.Ip.Port) { - OnBytesReceived = b => - { - var header = KafkaDecoder.DecodeHeader(b.Skip(4)); + OnReceivedAsync = async data => { + var header = KafkaDecoder.DecodeHeader(data.Skip(4)); var bytes = KafkaDecoder.EncodeResponseBytes(new RequestContext(header.CorrelationId), response); - AsyncContext.Run(async () => await _server.SendDataAsync(bytes)); + await _server.SendDataAsync(bytes); } }; _connection = new Connection(endpoint); diff --git a/src/KafkaClient.Testing/KafkaDecoder.cs b/src/KafkaClient.Testing/KafkaDecoder.cs index 083dc66d..1bc04684 100644 --- a/src/KafkaClient.Testing/KafkaDecoder.cs +++ b/src/KafkaClient.Testing/KafkaDecoder.cs @@ -19,6 +19,16 @@ public static IRequestContext DecodeHeader(ArraySegment bytes) } } + public static Tuple DecodeFullHeader(ArraySegment bytes) + { + ApiKey apiKey; + IRequestContext context; + using (ReadHeader(bytes, out apiKey, out context)) + { + return new Tuple(context, apiKey); + } + } + public static T Decode(ArraySegment bytes, IRequestContext context = null) where T : class, IRequest { var protocolType = context?.ProtocolType; @@ -403,23 +413,29 @@ private static IKafkaReader ReadHeader(ArraySegment data) } private static IKafkaReader ReadHeader(ArraySegment data, out IRequestContext context) + { + ApiKey apikey; + return ReadHeader(data, out apikey, out context); + } + + private static IKafkaReader ReadHeader(ArraySegment data, out ApiKey apiKey, out IRequestContext context) { var reader = new KafkaReader(data); try { - // ReSharper disable once UnusedVariable - var apiKey = (ApiKey) reader.ReadInt16(); + apiKey = (ApiKey)reader.ReadInt16(); var version = reader.ReadInt16(); var correlationId = reader.ReadInt32(); var clientId = reader.ReadString(); context = new RequestContext(correlationId, version, clientId); } catch { + apiKey = 0; context = null; reader.Dispose(); reader = null; } return reader; - } + } #endregion diff --git a/src/KafkaClient.Testing/TcpServer.cs b/src/KafkaClient.Testing/TcpServer.cs index 7b5e0a1b..80285234 100644 --- a/src/KafkaClient.Testing/TcpServer.cs +++ b/src/KafkaClient.Testing/TcpServer.cs @@ -10,7 +10,7 @@ namespace KafkaClient.Testing { public class TcpServer : IDisposable { - public Action> OnBytesReceived { get; set; } + public Func, Task> OnReceivedAsync { get; set; } public Action OnConnected { get; set; } public Action OnDisconnected { get; set; } @@ -34,7 +34,7 @@ public async Task SendDataAsync(ArraySegment data) { try { await _clientSemaphore.WaitAsync(); - _log.Debug(() => LogEvent.Create($"FAKE Server: writing {data.Count} bytes.")); + _log.Debug(() => LogEvent.Create($"SERVER: writing {data.Count} bytes.")); await _client.GetStream().WriteAsync(data.Array, data.Offset, data.Count).ConfigureAwait(false); return true; } catch (Exception ex) { @@ -58,9 +58,9 @@ private async Task ClientConnectAsync() { var buffer = new byte[8192]; while (!_disposeToken.IsCancellationRequested) { - _log.Debug(() => LogEvent.Create("Server: Accepting clients.")); + _log.Debug(() => LogEvent.Create("SERVER: Accepting clients.")); _client = await _listener.AcceptTcpClientAsync().ConfigureAwait(false); - _log.Debug(() => LogEvent.Create("Server: Connected client")); + _log.Debug(() => LogEvent.Create("SERVER: Connected client")); _connectedTrigger.TrySetResult(true); OnConnected?.Invoke(); @@ -72,19 +72,19 @@ private async Task ClientConnectAsync() var read = await stream.ReadAsync(buffer, 0, buffer.Length, _disposeToken.Token) .ThrowIfCancellationRequested(_disposeToken.Token) .ConfigureAwait(false); - if (read > 0) { - OnBytesReceived?.Invoke(new ArraySegment(buffer, 0, read)); + if (read > 0 && OnReceivedAsync != null) { + await OnReceivedAsync(new ArraySegment(buffer, 0, read)); } } } catch (Exception ex) { if (!(ex is OperationCanceledException)) { - _log.Debug(() => LogEvent.Create(ex, "Server: client failed")); + _log.Debug(() => LogEvent.Create(ex, "SERVER: client failed")); } } finally { _client?.Dispose(); } - _log.Debug(() => LogEvent.Create("Server: Client Disconnected.")); + _log.Debug(() => LogEvent.Create("SERVER: Client Disconnected.")); await _clientSemaphore.WaitAsync(_disposeToken.Token); //remove the one client _connectedTrigger = new TaskCompletionSource(); OnDisconnected?.Invoke(); diff --git a/src/KafkaClient.Tests/ByteMembershipEncoder.cs b/src/KafkaClient.Tests/ByteMembershipEncoder.cs index ac931953..c1133182 100644 --- a/src/KafkaClient.Tests/ByteMembershipEncoder.cs +++ b/src/KafkaClient.Tests/ByteMembershipEncoder.cs @@ -1,5 +1,4 @@ using KafkaClient.Assignment; -using KafkaClient.Common; using KafkaClient.Protocol; namespace KafkaClient.Tests diff --git a/src/KafkaClient.Tests/ConsoleLog.cs b/src/KafkaClient.Tests/ConsoleLog.cs index 8c132c68..e7d0cf67 100644 --- a/src/KafkaClient.Tests/ConsoleLog.cs +++ b/src/KafkaClient.Tests/ConsoleLog.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Immutable; using KafkaClient.Common; -using NUnit.Framework; namespace KafkaClient.Tests { diff --git a/src/KafkaClient.Tests/FakeRouter.cs b/src/KafkaClient.Tests/FakeRouter.cs index 68da0580..8b30d806 100644 --- a/src/KafkaClient.Tests/FakeRouter.cs +++ b/src/KafkaClient.Tests/FakeRouter.cs @@ -75,7 +75,7 @@ public IRouter Create(TimeSpan? cacheExpiration = null) return new Router( new [] { new Endpoint(new IPEndPoint(IPAddress.Loopback, 1)), new Endpoint(new IPEndPoint(IPAddress.Loopback, 2)) }, KafkaConnectionFactory, - routerConfiguration: new RouterConfiguration(cacheExpiration: cacheExpiration.GetValueOrDefault(TimeSpan.FromMilliseconds(1))), + routerConfiguration: new RouterConfiguration(cacheExpiration: cacheExpiration.GetValueOrDefault(TimeSpan.FromMinutes(1))), log: TestConfig.Log); } diff --git a/src/KafkaClient.Tests/Integration/ConnectionTests.cs b/src/KafkaClient.Tests/Integration/ConnectionTests.cs index 565e5253..7d47af04 100644 --- a/src/KafkaClient.Tests/Integration/ConnectionTests.cs +++ b/src/KafkaClient.Tests/Integration/ConnectionTests.cs @@ -3,9 +3,8 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; -using KafkaClient.Connections; +using KafkaClient.Common; using KafkaClient.Protocol; -using Nito.AsyncEx; using NUnit.Framework; namespace KafkaClient.Tests.Integration @@ -13,38 +12,35 @@ namespace KafkaClient.Tests.Integration [TestFixture] public class ConnectionTests { - private Connection _conn; - - [SetUp] - public void Setup() - { - var options = new KafkaOptions(TestConfig.IntegrationUri, new ConnectionConfiguration(versionSupport: VersionSupport.Kafka9.Dynamic()), log: TestConfig.Log); - var endpoint = AsyncContext.Run(() => Endpoint.ResolveAsync(options.ServerUris.First(), options.Log)); - - _conn = new Connection(endpoint, options.ConnectionConfiguration, TestConfig.Log); - } - [Test] public async Task EnsureTwoRequestsCanCallOneAfterAnother() { - var result1 = await _conn.SendAsync(new MetadataRequest(), CancellationToken.None); - var result2 = await _conn.SendAsync(new MetadataRequest(), CancellationToken.None); - Assert.That(result1.Errors.Count(code => code != ErrorCode.None), Is.EqualTo(0)); - Assert.That(result2.Errors.Count(code => code != ErrorCode.None), Is.EqualTo(0)); + await Async.Using( + await TestConfig.Options.CreateConnectionAsync(), + async connection => { + var result1 = await connection.SendAsync(new MetadataRequest(), CancellationToken.None); + var result2 = await connection.SendAsync(new MetadataRequest(), CancellationToken.None); + Assert.That(result1.Errors.Count(code => code != ErrorCode.None), Is.EqualTo(0)); + Assert.That(result2.Errors.Count(code => code != ErrorCode.None), Is.EqualTo(0)); + }); } [Test] public async Task EnsureAsyncRequestResponsesCorrelate() { - var result1 = _conn.SendAsync(new MetadataRequest(), CancellationToken.None); - var result2 = _conn.SendAsync(new MetadataRequest(), CancellationToken.None); - var result3 = _conn.SendAsync(new MetadataRequest(), CancellationToken.None); - - await Task.WhenAll(result1, result2, result3); - - Assert.That(result1.Result.Errors.Count(code => code != ErrorCode.None), Is.EqualTo(0)); - Assert.That(result2.Result.Errors.Count(code => code != ErrorCode.None), Is.EqualTo(0)); - Assert.That(result3.Result.Errors.Count(code => code != ErrorCode.None), Is.EqualTo(0)); + await Async.Using( + await TestConfig.Options.CreateConnectionAsync(), + async connection => { + var result1 = connection.SendAsync(new MetadataRequest(), CancellationToken.None); + var result2 = connection.SendAsync(new MetadataRequest(), CancellationToken.None); + var result3 = connection.SendAsync(new MetadataRequest(), CancellationToken.None); + + await Task.WhenAll(result1, result2, result3); + + Assert.That(result1.Result.Errors.Count(code => code != ErrorCode.None), Is.EqualTo(0)); + Assert.That(result2.Result.Errors.Count(code => code != ErrorCode.None), Is.EqualTo(0)); + Assert.That(result3.Result.Errors.Count(code => code != ErrorCode.None), Is.EqualTo(0)); + }); } [Test] @@ -54,7 +50,7 @@ public async Task EnsureMultipleAsyncRequestsCanReadResponses([Values(1, 5)] int var requestTasks = new ConcurrentBag>(); using (var router = await TestConfig.Options.CreateRouterAsync()) { await router.TemporaryTopicAsync(async topicName => { - var singleResult = await _conn.SendAsync(new MetadataRequest(TestConfig.TopicName()), CancellationToken.None); + var singleResult = await router.Connections.First().SendAsync(new MetadataRequest(TestConfig.TopicName()), CancellationToken.None); Assert.That(singleResult.Topics.Count, Is.GreaterThan(0)); Assert.That(singleResult.Topics.First().Partitions.Count, Is.GreaterThan(0)); @@ -64,7 +60,7 @@ await router.TemporaryTopicAsync(async topicName => { while (true) { await Task.Delay(1); if (Interlocked.Increment(ref requestsSoFar) > totalRequests) break; - requestTasks.Add(_conn.SendAsync(new MetadataRequest(), CancellationToken.None)); + requestTasks.Add(router.Connections.First().SendAsync(new MetadataRequest(), CancellationToken.None)); } })); } @@ -85,10 +81,10 @@ public async Task EnsureDifferentTypesOfResponsesCanBeReadAsync() using (var router = await TestConfig.Options.CreateRouterAsync()) { await router.TemporaryTopicAsync( async topicName => { - var result1 = _conn.SendAsync(RequestFactory.CreateProduceRequest(topicName, "test"), CancellationToken.None); - var result2 = _conn.SendAsync(new MetadataRequest(topicName), CancellationToken.None); - var result3 = _conn.SendAsync(RequestFactory.CreateOffsetRequest(topicName), CancellationToken.None); - var result4 = _conn.SendAsync(RequestFactory.CreateFetchRequest(topicName, 0), CancellationToken.None); + var result1 = router.Connections.First().SendAsync(RequestFactory.CreateProduceRequest(topicName, "test"), CancellationToken.None); + var result2 = router.Connections.First().SendAsync(new MetadataRequest(topicName), CancellationToken.None); + var result3 = router.Connections.First().SendAsync(RequestFactory.CreateOffsetRequest(topicName), CancellationToken.None); + var result4 = router.Connections.First().SendAsync(RequestFactory.CreateFetchRequest(topicName, 0), CancellationToken.None); await Task.WhenAll(result1, result2, result3, result4); diff --git a/src/KafkaClient.Tests/TestConfig.cs b/src/KafkaClient.Tests/TestConfig.cs index 0dd1870f..9bbb77be 100644 --- a/src/KafkaClient.Tests/TestConfig.cs +++ b/src/KafkaClient.Tests/TestConfig.cs @@ -20,11 +20,11 @@ public static string GroupId([CallerMemberName] string name = null) } // turned down to reduce log noise -- turn up Level if necessary - public static ILog Log = new ConsoleLog(); + public static readonly ILog Log = new ConsoleLog(); - public static Uri ServerUri() + public static Endpoint ServerEndpoint() { - return new Uri($"http://localhost:{ServerPort()}"); + return new Endpoint(new IPEndPoint(IPAddress.Loopback, ServerPort()), "localhost"); } public static int ServerPort() diff --git a/src/KafkaClient.Tests/Unit/AssignmentTests.cs b/src/KafkaClient.Tests/Unit/AssignmentTests.cs index 8420ff81..9c350bc2 100644 --- a/src/KafkaClient.Tests/Unit/AssignmentTests.cs +++ b/src/KafkaClient.Tests/Unit/AssignmentTests.cs @@ -127,6 +127,15 @@ public async Task AssigmentSucceedsWhenStrategyExists() } } + [Test] + public void InterfacesAreFormattedWithinProtocol() + { + var request = new SyncGroupRequest("group", 5, "member", new[] { new SyncGroupRequest.GroupAssignment("member", new ConsumerMemberAssignment(new[] { new TopicPartition("topic", 0), new TopicPartition("topic", 1) })) }); + var formatted = request.ToString(); + Assert.That(formatted.Contains("TopicName:topic")); + Assert.That(formatted.Contains("PartitionId:1")); + } + // design unit TESTS to write: // assignment priority is given to first assignor if multiple available // non-leader calls to get assignment data diff --git a/src/KafkaClient.Tests/Unit/ConnectionTests.cs b/src/KafkaClient.Tests/Unit/ConnectionTests.cs index 38ca5f6a..bcd34f8e 100644 --- a/src/KafkaClient.Tests/Unit/ConnectionTests.cs +++ b/src/KafkaClient.Tests/Unit/ConnectionTests.cs @@ -1,16 +1,12 @@ using System; -using System.Collections.Concurrent; -using System.Collections.Generic; using System.IO; using System.Linq; -using System.Text; using System.Threading; using System.Threading.Tasks; using KafkaClient.Common; using KafkaClient.Connections; using KafkaClient.Protocol; using KafkaClient.Testing; -using Nito.AsyncEx; using NUnit.Framework; namespace KafkaClient.Tests.Unit @@ -23,7 +19,7 @@ public class ConnectionTests [Test] public async Task ShouldStartReadPollingOnConstruction() { - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var conn = new Connection(endpoint, log: TestConfig.Log)) { await TaskTest.WaitFor(() => conn.IsReaderAlive); @@ -32,9 +28,9 @@ public async Task ShouldStartReadPollingOnConstruction() } [Test] - public async Task ShouldReportServerUriOnConstruction() + public void ShouldReportServerUriOnConstruction() { - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var conn = new Connection(endpoint, log: TestConfig.Log)) { Assert.That(conn.Endpoint, Is.EqualTo(endpoint)); @@ -63,7 +59,7 @@ public async Task ShouldStartDedicatedThreadOnConstruction() { var count = 0; var config = new ConnectionConfiguration(onConnecting: (e, a, _) => Interlocked.Increment(ref count)); - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var conn = new Connection(endpoint, config, log: TestConfig.Log)) { await TaskTest.WaitFor(() => count > 0); @@ -76,7 +72,7 @@ public async Task ShouldAttemptMultipleTimesWhenConnectionFails() { var count = 0; var config = new ConnectionConfiguration(onConnecting: (e, a, _) => Interlocked.Increment(ref count)); - using (var conn = new Connection(await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log), config, TestConfig.Log)) + using (var conn = new Connection(TestConfig.ServerEndpoint(), config, TestConfig.Log)) { var task = conn.SendAsync(new FetchRequest(), CancellationToken.None); //will force a connection await TaskTest.WaitFor(() => count > 1, 10000); @@ -91,7 +87,7 @@ public async Task ShouldAttemptMultipleTimesWhenConnectionFails() [Test] public async Task ShouldDisposeWithoutExceptionThrown() { - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) { var conn = new Connection(endpoint, log: TestConfig.Log); @@ -103,7 +99,7 @@ public async Task ShouldDisposeWithoutExceptionThrown() [Test] public async Task ShouldDisposeWithoutExceptionEvenWhileCallingSendAsync() { - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var conn = new Connection(endpoint, log: TestConfig.Log)) { var task = conn.SendAsync(new MetadataRequest(), CancellationToken.None); @@ -132,7 +128,7 @@ public async Task ShouldLogDisconnectAndRecover([Values(3, 4)] int connectionAtt Interlocked.Increment(ref clientConnected); }); - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log) { OnConnected = () => Interlocked.Increment(ref serverConnected) }) @@ -166,7 +162,7 @@ public async Task ShouldFinishPartiallyReadMessage() var config = new ConnectionConfiguration(onReadBytes: (e, attempted, actual, elapsed) => Interlocked.Add(ref bytesRead, actual)); - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) using (new Connection(endpoint, config, mockLog)) { @@ -212,7 +208,7 @@ public async Task ReadShouldIgnoreMessageWithUnknownCorrelationId() var mockLog = new MemoryLog(); var config = new ConnectionConfiguration(onRead: (e, buffer, elapsed) => receivedData = true); - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) using (var conn = new Connection(endpoint, config, log: mockLog)) { @@ -240,7 +236,7 @@ public async Task ReadShouldCancelWhileAwaitingResponse() semaphore.Release(); }); - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) using (var conn = new Connection(endpoint, config, log: TestConfig.Log)) { @@ -248,7 +244,7 @@ public async Task ReadShouldCancelWhileAwaitingResponse() var taskResult = conn.SendAsync(new FetchRequest(), token.Token); - Thread.Sleep(100); + await Task.Delay(100); token.Cancel(); semaphore.Wait(TimeSpan.FromSeconds(1)); @@ -260,44 +256,34 @@ public async Task ReadShouldCancelWhileAwaitingResponse() [Test] public async Task ReadShouldCancelWhileAwaitingReconnection() { - int connectionAttempt = 0; - var config = new ConnectionConfiguration(onConnecting: (e, attempt, elapsed) => connectionAttempt = attempt); - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); - using (var conn = new Connection(endpoint, config, TestConfig.Log)) - using (var token = new CancellationTokenSource()) - { - var taskResult = conn.SendAsync(new FetchRequest(), token.Token); - - await TaskTest.WaitFor(() => connectionAttempt > 1); - - token.Cancel(); - - await Task.WhenAny(taskResult, Task.Delay(500)); - - Assert.That(taskResult.IsCanceled, Is.True); + using (var token = new CancellationTokenSource()) { + var config = new ConnectionConfiguration(onConnecting: (e, attempt, elapsed) => token.Cancel()); + var endpoint = TestConfig.ServerEndpoint(); + using (var conn = new Connection(endpoint, config, TestConfig.Log)) { + var taskResult = conn.SendAsync(new FetchRequest(), token.Token); + await Task.WhenAny(taskResult, Task.Delay(500)); + Assert.That(taskResult.IsCanceled, Is.True); + } } } [Test] public async Task ReadShouldReconnectEvenAfterCancelledRead() { - int connectionAttempt = 0; - var config = new ConnectionConfiguration(onConnecting: (e, attempt, elapsed) => Interlocked.Exchange(ref connectionAttempt, attempt)); - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); - using (var conn = new Connection(endpoint, config, TestConfig.Log)) - using (var token = new CancellationTokenSource()) - { - var taskResult = conn.SendAsync(new FetchRequest(), token.Token); - - await TaskTest.WaitFor(() => connectionAttempt > 1); - - var attemptsMadeSoFar = connectionAttempt; - - token.Cancel(); - - await TaskTest.WaitFor(() => connectionAttempt > attemptsMadeSoFar); - - Assert.That(connectionAttempt, Is.GreaterThan(attemptsMadeSoFar)); + using (var token = new CancellationTokenSource()) { + var connectionAttempt = 0; + var config = new ConnectionConfiguration( + onConnecting: (e, attempt, elapsed) => { + if (Interlocked.Increment(ref connectionAttempt) > 1) { + token.Cancel(); + } + }); + var endpoint = TestConfig.ServerEndpoint(); + using (var conn = new Connection(endpoint, config, TestConfig.Log)) { + var taskResult = conn.SendAsync(new FetchRequest(), token.Token); + var multipleAttempts = await TaskTest.WaitFor(() => connectionAttempt > 1); + Assert.That(multipleAttempts); + } } } @@ -305,7 +291,7 @@ public async Task ReadShouldReconnectEvenAfterCancelledRead() public async Task ShouldReconnectAfterLosingConnectionAndBeAbleToStartNewRead() { var log = TestConfig.Log; - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) { var serverDisconnects = 0; var serverConnects = 0; @@ -347,13 +333,14 @@ public async Task ShouldReconnectAfterLosingConnectionAndBeAbleToStartNewRead() [Test] public async Task SendAsyncShouldTimeoutWhenSendAsyncTakesTooLong() { - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) using (var conn = new Connection(endpoint, new ConnectionConfiguration(requestTimeout: TimeSpan.FromMilliseconds(1)), log: TestConfig.Log)) { await Task.WhenAny(server.ClientConnected, Task.Delay(TimeSpan.FromSeconds(3))); var sendTask = conn.SendAsync(new MetadataRequest(), CancellationToken.None); + await Task.WhenAny(sendTask, Task.Delay(100)); Assert.That(sendTask.IsFaulted, Is.True, "Task should have reported an exception."); @@ -361,10 +348,28 @@ public async Task SendAsyncShouldTimeoutWhenSendAsyncTakesTooLong() } } + [Test] + public async Task SendAsyncShouldReturnImmediatelyWhenNoAcks() + { + var endpoint = TestConfig.ServerEndpoint(); + using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) + using (var conn = new Connection(endpoint, new ConnectionConfiguration(requestTimeout: TimeSpan.FromMilliseconds(1)), log: TestConfig.Log)) + { + await Task.WhenAny(server.ClientConnected, Task.Delay(TimeSpan.FromSeconds(3))); + + var sendTask = conn.SendAsync(new ProduceRequest(new ProduceRequest.Topic("topic", 0, new []{ new Message("value") }), acks: 0), CancellationToken.None); + + await Task.WhenAny(sendTask, Task.Delay(100)); + + Assert.That(sendTask.IsFaulted, Is.False); + Assert.That(sendTask.IsCompleted); + } + } + [Test] public async Task SendAsyncShouldNotAllowResponseToTimeoutWhileAwaitingKafkaToEstableConnection() { - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var conn = new Connection(endpoint, new ConnectionConfiguration(requestTimeout: TimeSpan.FromSeconds(1000)), log: TestConfig.Log)) { // SendAsync blocked by reconnection attempts @@ -378,9 +383,9 @@ public async Task SendAsyncShouldNotAllowResponseToTimeoutWhileAwaitingKafkaToEs using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) { server.OnConnected = () => TestConfig.Log.Info(() => LogEvent.Create("Client connected...")); - server.OnBytesReceived = b => { - var requestContext = KafkaDecoder.DecodeHeader(b.Skip(KafkaEncoder.IntegerByteSize)); - AsyncContext.Run(async () => await server.SendDataAsync(MessageHelper.CreateMetadataResponse(requestContext, "Test"))); + server.OnReceivedAsync = async data => { + var requestContext = KafkaDecoder.DecodeHeader(data.Skip(KafkaEncoder.IntegerByteSize)); + await server.SendDataAsync(MessageHelper.CreateMetadataResponse(requestContext, "Test")); }; await Task.WhenAny(taskResult, Task.Delay(TimeSpan.FromSeconds(5))); @@ -397,14 +402,13 @@ public async Task SendAsyncShouldNotAllowResponseToTimeoutWhileAwaitingKafkaToEs public async Task SendAsyncShouldUseStatictVersionInfo() { IRequestContext context = null; - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) using (var conn = new Connection(endpoint, new ConnectionConfiguration(requestTimeout: TimeSpan.FromSeconds(1000), versionSupport: VersionSupport.Kafka10), log: TestConfig.Log)) { - server.OnBytesReceived = data => - { + server.OnReceivedAsync = async data => { context = KafkaDecoder.DecodeHeader(data.Skip(KafkaEncoder.IntegerByteSize)); - var send = server.SendDataAsync(KafkaDecoder.EncodeResponseBytes(context, new FetchResponse())); + await server.SendDataAsync(KafkaDecoder.EncodeResponseBytes(context, new FetchResponse())); }; await conn.SendAsync(new FetchRequest(new FetchRequest.Topic("Foo", 0, 0)), CancellationToken.None); @@ -414,10 +418,84 @@ public async Task SendAsyncShouldUseStatictVersionInfo() } } + [Test] + public async Task SendAsyncWithDynamicVersionInfoMakesVersionCallFirst() + { + var firstCorrelation = -1; + var correlationId = 0; + var sentVersion = (short)-1; + + var endpoint = TestConfig.ServerEndpoint(); + using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) + using (var conn = new Connection(endpoint, new ConnectionConfiguration(requestTimeout: TimeSpan.FromSeconds(3), versionSupport: VersionSupport.Kafka8.Dynamic()), log: TestConfig.Log)) + { + var apiVersion = (short)3; + server.OnReceivedAsync = async data => { + var context = KafkaDecoder.DecodeHeader(data.Skip(KafkaEncoder.IntegerByteSize)); + if (firstCorrelation < 0) + { + firstCorrelation = context.CorrelationId; + } + correlationId = context.CorrelationId; + switch (correlationId - firstCorrelation) + { + case 0: + await server.SendDataAsync(KafkaDecoder.EncodeResponseBytes(context, new ApiVersionsResponse(ErrorCode.None, new[] { new ApiVersionsResponse.VersionSupport(ApiKey.Fetch, apiVersion, apiVersion) }))); + break; + case 1: + sentVersion = context.ApiVersion.GetValueOrDefault(); + await server.SendDataAsync(KafkaDecoder.EncodeResponseBytes(context, new FetchResponse())); + break; + + default: + return; + } + }; + + await conn.SendAsync(new FetchRequest(new FetchRequest.Topic("Foo", 0, 0)), CancellationToken.None); + var receivedFetch = await TaskTest.WaitFor(() => correlationId - firstCorrelation >= 1); + + Assert.That(receivedFetch); + Assert.That(sentVersion, Is.EqualTo(apiVersion)); + } + } + + [Test] + public async Task SendAsyncWithDynamicVersionInfoOnlyMakesVersionCallOnce() + { + var versionRequests = 0; + + var endpoint = TestConfig.ServerEndpoint(); + using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) + using (var conn = new Connection(endpoint, new ConnectionConfiguration(requestTimeout: TimeSpan.FromSeconds(3), versionSupport: VersionSupport.Kafka8.Dynamic()), log: TestConfig.Log)) + { + server.OnReceivedAsync = async data => { + var fullHeader = KafkaDecoder.DecodeFullHeader(data.Skip(KafkaEncoder.IntegerByteSize)); + var context = fullHeader.Item1; + switch (fullHeader.Item2) { + case ApiKey.ApiVersions: + Interlocked.Increment(ref versionRequests); + await server.SendDataAsync(KafkaDecoder.EncodeResponseBytes(context, new ApiVersionsResponse(ErrorCode.None, new[] { new ApiVersionsResponse.VersionSupport(ApiKey.Fetch, 3, 3) }))); + break; + + default: + await server.SendDataAsync(KafkaDecoder.EncodeResponseBytes(context, new FetchResponse())); + break; + } + }; + + for (var i = 0; i < 3; i++) { + await conn.SendAsync(new FetchRequest(new FetchRequest.Topic("Foo", 0, 0)), CancellationToken.None); + } + + Assert.That(versionRequests, Is.EqualTo(1)); + } + } + [Test] public async Task SendAsyncShouldTimeoutMultipleMessagesAtATime() { - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) using (var conn = new Connection(endpoint, new ConnectionConfiguration(requestTimeout: TimeSpan.FromMilliseconds(100)), log: TestConfig.Log)) { @@ -438,6 +516,103 @@ public async Task SendAsyncShouldTimeoutMultipleMessagesAtATime() } } + [Test] + public async Task MessagesStillLogWhenSendTimesOut() + { + var logger = new MemoryLog(); + var received = 0; + var timeout = TimeSpan.FromMilliseconds(100); + + var endpoint = TestConfig.ServerEndpoint(); + using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) + using (var conn = new Connection(endpoint, new ConnectionConfiguration(requestTimeout: timeout, onRead: (e, read, elapsed) => Interlocked.Increment(ref received)), logger)) + { + await Task.WhenAny(server.ClientConnected, Task.Delay(TimeSpan.FromSeconds(3))); + + server.OnReceivedAsync = async data => { + var context = KafkaDecoder.DecodeHeader(data.Skip(KafkaEncoder.IntegerByteSize)); + await Task.Delay(timeout); + await server.SendDataAsync(KafkaDecoder.EncodeResponseBytes(context, new MetadataResponse())); + }; + + try + { + await conn.SendAsync(new MetadataRequest(), CancellationToken.None); + Assert.Fail("Should have thrown TimeoutException"); + } catch (TimeoutException) { + // expected + } + + await TaskTest.WaitFor(() => received > 0); + + var hasLog = await TaskTest.WaitFor(() => logger.LogEvents.Any(e => e.Item1 == LogLevel.Debug && e.Item2.Message.StartsWith("Timed out -----> (timed out or otherwise errored in client)"))); + Assert.True(hasLog, "Wrote timed out message"); + } + } + + [Test] + public async Task TimedOutQueueIsClearedWhenTooBig() + { + var logger = new MemoryLog(); + var timeout = TimeSpan.FromMilliseconds(1); + + var endpoint = TestConfig.ServerEndpoint(); + using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) + using (var conn = new Connection(endpoint, new ConnectionConfiguration(requestTimeout: timeout), logger)) + { + await Task.WhenAny(server.ClientConnected, Task.Delay(TimeSpan.FromSeconds(3))); + + try { + // generate enough requests to overflow the buffer size + await Task.WhenAll(Enumerable.Range(0, 102).Select(i => conn.SendAsync(new MetadataRequest(), CancellationToken.None))); + Assert.Fail("Should have thrown TimeoutException"); + } catch (TimeoutException) { + // expected + } + + var hasLog = await TaskTest.WaitFor(() => logger.LogEvents.Any(e => e.Item1 == LogLevel.Debug && e.Item2.Message.StartsWith("Clearing timed out requests to avoid overflow"))); + Assert.True(hasLog, "Wrote timed out message"); + } + } + + [Test] + public async Task CorrelationOverflowGuardWorks() + { + var correlationId = -1; + + var endpoint = TestConfig.ServerEndpoint(); + using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) + using (var conn = new Connection(endpoint, new ConnectionConfiguration(requestTimeout: TimeSpan.Zero), new ConsoleLog())) { + server.OnReceivedAsync = data => { + var context = KafkaDecoder.DecodeHeader(data.Skip(KafkaEncoder.IntegerByteSize)); + correlationId = context.CorrelationId; + return Task.FromResult(0); + }; + + try { + Connection.OverflowGuard = 100; + + try { + await Task.WhenAll(Enumerable.Range(0, Connection.OverflowGuard).Select(i => conn.SendAsync(new MetadataRequest(), CancellationToken.None))); + Assert.Fail("Should have thrown TimeoutException"); + } catch (TimeoutException) { + // expected + } + Assert.That(await TaskTest.WaitFor(() => correlationId == Connection.OverflowGuard)); + try { + await conn.SendAsync(new MetadataRequest(), CancellationToken.None); + Assert.Fail("Should have thrown TimeoutException"); + } catch (TimeoutException) { + // expected + } + Assert.That(await TaskTest.WaitFor(() => correlationId == 1)); + } + finally { + Connection.OverflowGuard = int.MaxValue >> 1; + } + } + } + #endregion private static byte[] CreateCorrelationMessage(int id) diff --git a/src/KafkaClient.Tests/Unit/KafkaEndpointTests.cs b/src/KafkaClient.Tests/Unit/EndpointTests.cs similarity index 93% rename from src/KafkaClient.Tests/Unit/KafkaEndpointTests.cs rename to src/KafkaClient.Tests/Unit/EndpointTests.cs index 2bf969cc..5a12992c 100644 --- a/src/KafkaClient.Tests/Unit/KafkaEndpointTests.cs +++ b/src/KafkaClient.Tests/Unit/EndpointTests.cs @@ -7,10 +7,10 @@ namespace KafkaClient.Tests.Unit { [TestFixture] - public class KafkaEndpointTests + public class EndpointTests { [Test] - public async Task EnsureEndpointCanBeResulved() + public async Task EnsureEndpointCanBeResolved() { var expected = IPAddress.Parse("127.0.0.1"); var endpoint = await Endpoint.ResolveAsync(new Uri("http://localhost:8888"), TestConfig.Log); diff --git a/src/KafkaClient.Tests/Unit/KafkaReaderTests.cs b/src/KafkaClient.Tests/Unit/KafkaReaderTests.cs index c64fabd2..b4e344a9 100644 --- a/src/KafkaClient.Tests/Unit/KafkaReaderTests.cs +++ b/src/KafkaClient.Tests/Unit/KafkaReaderTests.cs @@ -1,6 +1,5 @@ using System; using System.IO; -using KafkaClient.Common; using KafkaClient.Protocol; using NUnit.Framework; diff --git a/src/KafkaClient.Tests/Unit/KafkaWriterTests.cs b/src/KafkaClient.Tests/Unit/KafkaWriterTests.cs index 64fc94b2..32662ed8 100644 --- a/src/KafkaClient.Tests/Unit/KafkaWriterTests.cs +++ b/src/KafkaClient.Tests/Unit/KafkaWriterTests.cs @@ -1,6 +1,5 @@ using System.IO; using System.Linq; -using KafkaClient.Common; using KafkaClient.Protocol; using NUnit.Framework; diff --git a/src/KafkaClient.Tests/Unit/ProtocolBaseRequestTests.cs b/src/KafkaClient.Tests/Unit/ProtocolBaseRequestTests.cs deleted file mode 100644 index e851f173..00000000 --- a/src/KafkaClient.Tests/Unit/ProtocolBaseRequestTests.cs +++ /dev/null @@ -1,21 +0,0 @@ -using System; -using KafkaClient.Protocol; -using NUnit.Framework; - -namespace KafkaClient.Tests.Unit -{ - [TestFixture] - public class ProtocolBaseRequestTests - { - [Test] - public void EnsureHeaderShouldPackCorrectByteLengths() - { - var result = KafkaEncoder.Encode(new RequestContext(123456789, clientId: "test"), new ApiVersionsRequest()); - - var withoutLength = new byte[result.Count - 4]; - Buffer.BlockCopy(result.Array, 4, withoutLength, 0, result.Count - 4); - Assert.That(withoutLength.Length, Is.EqualTo(14)); - Assert.That(withoutLength, Is.EqualTo(new byte[] { 0, 18, 0, 0, 7, 91, 205, 21, 0, 4, 116, 101, 115, 116 })); - } - } -} \ No newline at end of file diff --git a/src/KafkaClient.Tests/Unit/ProtocolByteTests.cs b/src/KafkaClient.Tests/Unit/ProtocolByteTests.cs deleted file mode 100644 index 435d5541..00000000 --- a/src/KafkaClient.Tests/Unit/ProtocolByteTests.cs +++ /dev/null @@ -1,873 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using KafkaClient.Assignment; -using KafkaClient.Common; -using KafkaClient.Protocol; -using KafkaClient.Testing; -using NUnit.Framework; -using NUnit.Framework.Internal; - -namespace KafkaClient.Tests.Unit -{ - /// - /// From http://kafka.apache.org/protocol.html#protocol_types - /// The protocol is built out of the following primitive types. - /// - /// Fixed Width Primitives: - /// int8, int16, int32, int64 - Signed integers with the given precision (in bits) stored in big endian order. - /// - /// Variable Length Primitives: - /// bytes, string - These types consist of a signed integer giving a length N followed by N bytes of content. - /// A length of -1 indicates null. string uses an int16 for its size, and bytes uses an int32. - /// - /// Arrays: - /// This is a notation for handling repeated structures. These will always be encoded as an int32 size containing - /// the length N followed by N repetitions of the structure which can itself be made up of other primitive types. - /// In the BNF grammars below we will show an array of a structure foo as [foo]. - /// - /// Message formats are from https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-CommonRequestandResponseStructure - /// - /// RequestOrResponse => Size (RequestMessage | ResponseMessage) - /// Size => int32 : The Size field gives the size of the subsequent request or response message in bytes. - /// The client can read requests by first reading this 4 byte size as an integer N, and - /// then reading and parsing the subsequent N bytes of the request. - /// - /// Request Header => api_key api_version correlation_id client_id - /// api_key => INT16 -- The id of the request type. - /// api_version => INT16 -- The version of the API. - /// correlation_id => INT32 -- A user-supplied integer value that will be passed back with the response. - /// client_id => NULLABLE_STRING -- A user specified identifier for the client making the request. - /// - /// Response Header => correlation_id - /// correlation_id => INT32 -- The user-supplied value passed in with the request - /// - [TestFixture] - public class ProtocolByteTests - { - private readonly Randomizer _randomizer = new Randomizer(); - - [Test] - public void ProduceRequest( - [Values(0, 1, 2)] short version, - [Values(0, 2, -1)] short acks, - [Values(0, 1000)] int timeoutMilliseconds, - [Values("testTopic")] string topic, - [Values(1, 10)] int topicsPerRequest, - [Values(1, 5)] int totalPartitions, - [Values(3)] int messagesPerSet, - [Values(MessageCodec.None, MessageCodec.Gzip, MessageCodec.Snappy)] MessageCodec codec) - { - var payloads = new List(); - for (var t = 0; t < topicsPerRequest; t++) { - var partition = 1 + t%totalPartitions; - payloads.Add(new ProduceRequest.Topic(topic + t, partition, GenerateMessages(messagesPerSet, (byte) (version >= 2 ? 1 : 0), codec), codec)); - } - var request = new ProduceRequest(payloads, TimeSpan.FromMilliseconds(timeoutMilliseconds), acks); - var requestWithUpdatedAttribute = new ProduceRequest(request.Topics.Select(t => new ProduceRequest.Topic(t.TopicName, t.PartitionId, - t.Messages.Select(m => m.Attribute == 0 ? m : new Message(m.Value, m.Key, 0, m.Offset, m.MessageVersion, m.Timestamp)))), - request.Timeout, request.Acks); - - var context = new RequestContext(16, version, "Test-Response"); - var data = KafkaEncoder.Encode(context, request); - var decoded = KafkaDecoder.Decode(data.Skip(4), context); - - // special case the comparison in the case of gzip because of the server semantics - if (!requestWithUpdatedAttribute.Equals(decoded)) { - var original = requestWithUpdatedAttribute.ToString(); - var final = decoded.ToString(); - Console.WriteLine($"Original\n{original}\nFinal\n{final}"); - Assert.That(final, Is.EqualTo(original)); - Assert.Fail("Not equal, although strings suggest they are?"); - } - } - - [Test] - public void ProduceResponse( - [Values(0, 1, 2)] short version, - [Values(-1, 0, 10000000)] long timestampMilliseconds, - [Values("testTopic")] string topicName, - [Values(1, 10)] int topicsPerRequest, - [Values(1, 5)] int totalPartitions, - [Values( - ErrorCode.None, - ErrorCode.CorruptMessage - )] ErrorCode errorCode, - [Values(0, 100000)] int throttleTime) - { - var topics = new List(); - for (var t = 0; t < topicsPerRequest; t++) { - topics.Add(new ProduceResponse.Topic(topicName + t, t % totalPartitions, errorCode, _randomizer.Next(), version >= 2 ? DateTimeOffset.FromUnixTimeMilliseconds(timestampMilliseconds) : (DateTimeOffset?)null)); - } - var response = new ProduceResponse(topics, version >= 1 ? TimeSpan.FromMilliseconds(throttleTime) : (TimeSpan?)null); - - response.AssertCanEncodeDecodeResponse(version); - } - - [Test] - public void FetchRequest( - [Values(0, 1, 2, 3)] short version, - [Values(0, 100)] int maxWaitMilliseconds, - [Values(0, 64000)] int minBytes, - [Values("testTopic")] string topic, - [Values(1, 10)] int topicsPerRequest, - [Values(1, 5)] int totalPartitions, - [Values(25600000)] int maxBytes) - { - var fetches = new List(); - for (var t = 0; t < topicsPerRequest; t++) { - fetches.Add(new FetchRequest.Topic(topic + t, t % totalPartitions, _randomizer.Next(0, int.MaxValue), maxBytes)); - } - var request = new FetchRequest(fetches, TimeSpan.FromMilliseconds(maxWaitMilliseconds), minBytes, version >= 3 ? maxBytes / _randomizer.Next(1, maxBytes) : 0); - request.AssertCanEncodeDecodeRequest(version); - } - - [Test] - public void FetchResponse( - [Values(0, 1, 2, 3)] short version, - [Values(0, 1234)] int throttleTime, - [Values("testTopic")] string topicName, - [Values(1, 10)] int topicsPerRequest, - [Values(1, 5)] int totalPartitions, - [Values(MessageCodec.None, MessageCodec.Gzip, MessageCodec.Snappy)] MessageCodec codec, - [Values( - ErrorCode.None, - ErrorCode.OffsetOutOfRange - )] ErrorCode errorCode, - [Values(3)] int messagesPerSet - ) - { - var topics = new List(); - for (var t = 0; t < topicsPerRequest; t++) { - var partitionId = t % totalPartitions; - var messages = GenerateMessages(messagesPerSet, (byte) (version >= 2 ? 1 : 0), codec); - topics.Add(new FetchResponse.Topic(topicName + t, partitionId, _randomizer.Next(), errorCode, messages)); - } - var response = new FetchResponse(topics, version >= 1 ? TimeSpan.FromMilliseconds(throttleTime) : (TimeSpan?)null); - var responseWithUpdatedAttribute = new FetchResponse(response.Topics.Select(t => new FetchResponse.Topic(t.TopicName, t.PartitionId, t.HighWaterMark, t.ErrorCode, - t.Messages.Select(m => m.Attribute == 0 ? m : new Message(m.Value, m.Key, 0, m.Offset, m.MessageVersion, m.Timestamp)))), - response.ThrottleTime); - - var context = new RequestContext(16, version, "Test-Response"); - var data = KafkaDecoder.EncodeResponseBytes(context, response); - var decoded = KafkaEncoder.Decode(context, ApiKey.Fetch, data.Skip(KafkaEncoder.ResponseHeaderSize)); - - // special case the comparison in the case of gzip because of the server semantics - if (!responseWithUpdatedAttribute.Equals(decoded)) { - var original = responseWithUpdatedAttribute.ToString(); - var final = decoded.ToString(); - Console.WriteLine($"Original\n{original}\nFinal\n{final}"); - Assert.That(final, Is.EqualTo(original)); - Assert.Fail("Not equal, although strings suggest they are?"); - } - } - - [Test] - public void OffsetsRequest( - [Values(0, 1)] short version, - [Values("testTopic")] string topic, - [Values(1, 10)] int topicsPerRequest, - [Values(1, 5)] int totalPartitions, - [Values(-2, -1, 123456, 10000000)] long time, - [Values(1, 10)] int maxOffsets) - { - var topics = new List(); - for (var t = 0; t < topicsPerRequest; t++) { - var offset = new OffsetRequest.Topic(topic + t, t % totalPartitions, time, version == 0 ? maxOffsets : 1); - topics.Add(offset); - } - var request = new OffsetRequest(topics); - - request.AssertCanEncodeDecodeRequest(version); - } - - [Test] - public void OffsetsResponse( - [Values(0, 1)] short version, - [Values("testTopic")] string topicName, - [Values(1, 10)] int topicsPerRequest, - [Values(5)] int totalPartitions, - [Values( - ErrorCode.UnknownTopicOrPartition, - ErrorCode.NotLeaderForPartition, - ErrorCode.Unknown - )] ErrorCode errorCode, - [Values(1, 5)] int offsetsPerPartition) - { - var topics = new List(); - for (var t = 0; t < topicsPerRequest; t++) { - var partitionId = t % totalPartitions; - for (var o = 0; o < offsetsPerPartition; o++) { - topics.Add(new OffsetResponse.Topic(topicName + t, partitionId, errorCode, _randomizer.Next(-1, int.MaxValue), version >= 1 ? (DateTimeOffset?)DateTimeOffset.UtcNow : null)); - } - } - var response = new OffsetResponse(topics); - - response.AssertCanEncodeDecodeResponse(version); - } - - [Test] - public void MetadataRequest( - [Values("testTopic")] string topic, - [Values(0, 1, 10)] int topicsPerRequest) - { - var topics = new List(); - for (var t = 0; t < topicsPerRequest; t++) { - topics.Add(topic + t); - } - var request = new MetadataRequest(topics); - - request.AssertCanEncodeDecodeRequest(0); - } - - [Test] - public void MetadataResponse( - [Values(0, 1, 2)] short version, - [Values(1, 15)] int brokersPerRequest, - [Values("testTopic")] string topicName, - [Values(1, 10)] int topicsPerRequest, - [Values(1, 5)] int partitionsPerTopic, - [Values( - ErrorCode.None, - ErrorCode.UnknownTopicOrPartition - )] ErrorCode errorCode) - { - var brokers = new List(); - for (var b = 0; b < brokersPerRequest; b++) { - string rack = null; - if (version >= 1) { - rack = "Rack" + b; - } - brokers.Add(new KafkaClient.Protocol.Broker(b, "broker-" + b, 9092 + b, rack)); - } - var topics = new List(); - for (var t = 0; t < topicsPerRequest; t++) { - var partitions = new List(); - for (var partitionId = 0; partitionId < partitionsPerTopic; partitionId++) { - var leader = _randomizer.Next(0, brokersPerRequest - 1); - var replica = 0; - var replicas = _randomizer.Next(0, brokersPerRequest - 1).Repeat(() => replica++); - var isr = 0; - var isrs = _randomizer.Next(0, replica).Repeat(() => isr++); - partitions.Add(new MetadataResponse.Partition(partitionId, leader, errorCode, replicas, isrs)); - } - topics.Add(new MetadataResponse.Topic(topicName + t, errorCode, partitions, version >= 1 ? topicsPerRequest%2 == 0 : (bool?)null)); - } - var response = new MetadataResponse(brokers, topics, version >= 1 ? brokersPerRequest : (int?)null, version >= 2 ? $"cluster-{version}" : null); - - response.AssertCanEncodeDecodeResponse(version); - } - - [Test] - public void OffsetCommitRequest( - [Values(0, 1, 2)] short version, - [Values("group1", "group2")] string groupId, - [Values(0, 5)] int generation, - [Values(-1, 20000)] int retentionTime, - [Values("testTopic")] string topic, - [Values(1, 10)] int topicsPerRequest, - [Values(5)] int maxPartitions, - [Values(10)] int maxOffsets, - [Values(null, "something useful for the client")] string metadata) - { - var timestamp = retentionTime; - var offsetCommits = new List(); - for (var t = 0; t < topicsPerRequest; t++) { - offsetCommits.Add(new OffsetCommitRequest.Topic( - topic + t, - t%maxPartitions, - _randomizer.Next(0, int.MaxValue), - metadata, - version == 1 ? timestamp : (long?)null)); - } - var request = new OffsetCommitRequest( - groupId, - offsetCommits, - version >= 1 ? "member" + generation : null, - version >= 1 ? generation : 0, - version >= 2 && retentionTime >= 0 ? (TimeSpan?) TimeSpan.FromMilliseconds(retentionTime) : null); - - request.AssertCanEncodeDecodeRequest(version); - } - - [Test] - public void OffsetCommitResponse( - [Values("testTopic")] string topicName, - [Values(1, 10)] int topicsPerRequest, - [Values(1, 5)] int partitionsPerTopic, - [Values( - ErrorCode.None, - ErrorCode.OffsetMetadataTooLarge - )] ErrorCode errorCode) - { - var topics = new List(); - for (var t = 0; t < topicsPerRequest; t++) { - for (var partitionId = 0; partitionId < partitionsPerTopic; partitionId++) { - topics.Add(new TopicResponse(topicName + t, partitionId, errorCode)); - } - } - var response = new OffsetCommitResponse(topics); - - response.AssertCanEncodeDecodeResponse(0); - } - - [Test] - public void OffsetFetchRequest( - [Values("group1", "group2")] string groupId, - [Values("testTopic")] string topic, - [Values(1, 10)] int topicsPerRequest, - [Values(5)] int maxPartitions) - { - var topics = new List(); - for (var t = 0; t < topicsPerRequest; t++) { - topics.Add(new TopicPartition(topic + t, t % maxPartitions)); - } - var request = new OffsetFetchRequest(groupId, topics); - - request.AssertCanEncodeDecodeRequest(0); - } - - [Test] - public void OffsetFetchResponse( - [Values("testTopic")] string topicName, - [Values(1, 10)] int topicsPerRequest, - [Values(1, 5)] int partitionsPerTopic, - [Values( - ErrorCode.None, - ErrorCode.UnknownTopicOrPartition, - ErrorCode.GroupLoadInProgress, - ErrorCode.NotCoordinatorForGroup, - ErrorCode.IllegalGeneration, - ErrorCode.UnknownMemberId, - ErrorCode.TopicAuthorizationFailed, - ErrorCode.GroupAuthorizationFailed - )] ErrorCode errorCode) - { - var topics = new List(); - for (var t = 0; t < topicsPerRequest; t++) { - for (var partitionId = 0; partitionId < partitionsPerTopic; partitionId++) { - var offset = (long)_randomizer.Next(int.MinValue, int.MaxValue); - topics.Add(new OffsetFetchResponse.Topic(topicName + t, partitionId, errorCode, offset, offset >= 0 ? topicName : string.Empty)); - } - } - var response = new OffsetFetchResponse(topics); - - response.AssertCanEncodeDecodeResponse(0); - } - - [Test] - public void GroupCoordinatorRequest([Values("group1", "group2")] string groupId) - { - var request = new GroupCoordinatorRequest(groupId); - request.AssertCanEncodeDecodeRequest(0); - } - - [Test] - public void GroupCoordinatorResponse( - [Values( - ErrorCode.None, - ErrorCode.GroupCoordinatorNotAvailable, - ErrorCode.GroupAuthorizationFailed - )] ErrorCode errorCode, - [Values(0, 1)] int coordinatorId - ) - { - var response = new GroupCoordinatorResponse(errorCode, coordinatorId, "broker-" + coordinatorId, 9092 + coordinatorId); - - response.AssertCanEncodeDecodeResponse(0); - } - - [Test] - public void ApiVersionsRequest() - { - var request = new ApiVersionsRequest(); - request.AssertCanEncodeDecodeRequest(0); - } - - [Test] - public void ApiVersionsResponse( - [Values( - ErrorCode.None, - ErrorCode.BrokerNotAvailable - )] ErrorCode errorCode - ) - { - var supported = new List(); - for (short apiKey = 0; apiKey <= 18; apiKey++) { - supported.Add(new ApiVersionsResponse.VersionSupport((ApiKey)apiKey, 0, (short)_randomizer.Next(0, 2))); - } - var response = new ApiVersionsResponse(errorCode, supported); - - response.AssertCanEncodeDecodeResponse(0); - } - - [Test] - public void JoinGroupRequest( - [Values(0, 1)] short version, - [Values("test", "a groupId")] string groupId, - [Values(1, 20000)] int sessionTimeout, - [Values("", "an existing member")] string memberId, - [Values("consumer", "other")] string protocolType, - [Values(1, 10)] int protocolsPerRequest) - { - var protocols = new List(); - for (var p = 0; p < protocolsPerRequest; p++) { - var bytes = new byte[protocolsPerRequest*100]; - _randomizer.NextBytes(bytes); - protocols.Add(new JoinGroupRequest.GroupProtocol(new ByteTypeMetadata("known", new ArraySegment(bytes)))); - } - var request = new JoinGroupRequest(groupId, TimeSpan.FromMilliseconds(sessionTimeout), memberId, protocolType, protocols, version >= 1 ? (TimeSpan?)TimeSpan.FromMilliseconds(sessionTimeout * 2) : null); - - request.AssertCanEncodeDecodeRequest(version, new ByteMembershipEncoder(protocolType)); - } - - [Test] - public void JoinGroupResponse( - [Values( - ErrorCode.None, - ErrorCode.OffsetMetadataTooLarge - )] ErrorCode errorCode, - [Values(0, 1, 20000)] int generationId, - [Values("consumer", "other")] string protocol, - [Values("test", "a groupId")] string leaderId, - [Values("", "an existing member")] string memberId, - [Values(1, 10)] int memberCount) - { - var members = new List(); - for (var m = 0; m < memberCount; m++) { - var bytes = new byte[memberCount*100]; - _randomizer.NextBytes(bytes); - members.Add(new JoinGroupResponse.Member(memberId + m, new ByteTypeMetadata("known", new ArraySegment(bytes)))); - } - var response = new JoinGroupResponse(errorCode, generationId, "known", leaderId, memberId, members); - - response.AssertCanEncodeDecodeResponse(0, new ByteMembershipEncoder(protocol)); - } - - [Test] - public void JoinConsumerGroupRequest( - [Values("test", "a groupId")] string groupId, - [Values(1, 20000)] int sessionTimeout, - [Values("", "an existing member")] string memberId, - [Values("mine", "yours")] string protocol, - [Values(1, 10)] int protocolsPerRequest) - { - var encoder = new ConsumerEncoder(); - var protocols = new List(); - for (var p = 0; p < protocolsPerRequest; p++) { - var userData = new byte[protocolsPerRequest*100]; - _randomizer.NextBytes(userData); - var metadata = new ConsumerProtocolMetadata(new []{ groupId, memberId, protocol }, protocol + p, new ArraySegment(userData), 0); - protocols.Add(new JoinGroupRequest.GroupProtocol(metadata)); - } - var request = new JoinGroupRequest(groupId, TimeSpan.FromMilliseconds(sessionTimeout), memberId, ConsumerEncoder.Protocol, protocols); - - request.AssertCanEncodeDecodeRequest(0, encoder); - } - - [Test] - public void JoinConsumerGroupResponse( - [Values( - ErrorCode.None, - ErrorCode.OffsetMetadataTooLarge - )] ErrorCode errorCode, - [Values(0, 1, 20000)] int generationId, - [Values("consumer")] string protocol, - [Values("test", "a groupId")] string leaderId, - [Values("", "an existing member")] string memberId, - [Values(1, 10)] int memberCount) - { - var encoder = new ConsumerEncoder(); - var members = new List(); - for (var m = 0; m < memberCount; m++) { - var userData = new byte[memberCount*100]; - _randomizer.NextBytes(userData); - var metadata = new ConsumerProtocolMetadata(new []{ protocol, memberId, leaderId }, protocol, new ArraySegment(userData), 0); - members.Add(new JoinGroupResponse.Member(memberId + m, metadata)); - } - var response = new JoinGroupResponse(errorCode, generationId, protocol, leaderId, memberId, members); - - response.AssertCanEncodeDecodeResponse(0, encoder); - } - - [Test] - public void HeartbeatRequest( - [Values("test", "a groupId")] string groupId, - [Values(0, 1, 20000)] int generationId, - [Values("", "an existing member")] string memberId) - { - var request = new HeartbeatRequest(groupId, generationId, memberId); - - request.AssertCanEncodeDecodeRequest(0); - } - - [Test] - public void HeartbeatResponse( - [Values( - ErrorCode.None, - ErrorCode.OffsetMetadataTooLarge - )] ErrorCode errorCode) - { - var response = new HeartbeatResponse(errorCode); - - response.AssertCanEncodeDecodeResponse(0); - } - - [Test] - public void LeaveGroupRequest( - [Values("test", "a groupId")] string groupId, - [Values("", "an existing member")] string memberId) - { - var request = new LeaveGroupRequest(groupId, memberId); - - request.AssertCanEncodeDecodeRequest(0); - } - - [Test] - public void LeaveGroupResponse( - [Values( - ErrorCode.None, - ErrorCode.OffsetMetadataTooLarge - )] ErrorCode errorCode) - { - var response = new LeaveGroupResponse(errorCode); - - response.AssertCanEncodeDecodeResponse(0); - } - - [Test] - public void SyncGroupRequest( - [Values("test", "a groupId")] string groupId, - [Values(0, 1, 20000)] int generationId, - [Values("", "an existing member")] string memberId, - [Values("consumer", "other")] string protocolType, - [Values(1, 10)] int assignmentsPerRequest) - { - var assignments = new List(); - for (var a = 0; a < assignmentsPerRequest; a++) { - var bytes = new byte[assignmentsPerRequest*100]; - _randomizer.NextBytes(bytes); - assignments.Add(new SyncGroupRequest.GroupAssignment(protocolType + a, new ByteTypeAssignment(new ArraySegment(bytes)))); - } - var request = new SyncGroupRequest(groupId, generationId, memberId, assignments); - - request.AssertCanEncodeDecodeRequest(0, new ByteMembershipEncoder(protocolType)); - } - - [Test] - public void SyncGroupResponse( - [Values( - ErrorCode.None, - ErrorCode.OffsetMetadataTooLarge - )] ErrorCode errorCode) - { - var bytes = new byte[1000]; - _randomizer.NextBytes(bytes); - var response = new SyncGroupResponse(errorCode, new ByteTypeAssignment(new ArraySegment(bytes))); - - response.AssertCanEncodeDecodeResponse(0, new ByteMembershipEncoder("protocolType")); - } - - [Test] - public void SyncConsumerGroupRequest( - [Values("test", "a groupId")] string groupId, - [Values(0, 1, 20000)] int generationId, - [Values("", "an existing member")] string memberId, - [Values("consumer")] string protocolType, - [Values(1, 10)] int assignmentsPerRequest) - { - var encoder = new ConsumerEncoder(); - var assignments = new List(); - for (var a = 0; a < assignmentsPerRequest; a++) { - var topics = new List(); - for (var t = 0; t < assignmentsPerRequest; t++) { - topics.Add(new TopicPartition(groupId + t, t)); - } - var userData = new byte[assignmentsPerRequest*100]; - _randomizer.NextBytes(userData); - var assignment = new ConsumerMemberAssignment(topics, new ArraySegment(userData), 0); - assignments.Add(new SyncGroupRequest.GroupAssignment(protocolType + a, assignment)); - } - var request = new SyncGroupRequest(groupId, generationId, memberId, assignments); - - request.AssertCanEncodeDecodeRequest(0, encoder); - } - - [Test] - public void SyncConsumerGroupResponse( - [Values( - ErrorCode.None, - ErrorCode.OffsetMetadataTooLarge - )] ErrorCode errorCode, - [Values(1, 10)] int memberCount) - { - var encoder = new ConsumerEncoder(); - var topics = new List(); - for (var t = 0; t < memberCount; t++) { - topics.Add(new TopicPartition("topic foo" + t, t)); - } - var userData = new byte[memberCount*100]; - _randomizer.NextBytes(userData); - var assignment = new ConsumerMemberAssignment(topics, new ArraySegment(userData), 0); - var response = new SyncGroupResponse(errorCode, assignment); - - response.AssertCanEncodeDecodeResponse(0, encoder); - } - - [Test] - public void DescribeGroupsRequest( - [Values("test", "a groupId")] string groupId, - [Range(1, 10)] int count) - { - var groups = new string[count]; - for (var g = 0; g < count; g++) { - groups[g] = groupId + g; - } - var request = new DescribeGroupsRequest(groups); - - request.AssertCanEncodeDecodeRequest(0); - } - - [Test] - public void DescribeGroupsResponse( - [Values( - ErrorCode.None, - ErrorCode.OffsetMetadataTooLarge - )] ErrorCode errorCode, - [Values("test", "a groupId")] string groupId, - [Range(2, 3)] int count, - [Values(KafkaClient.Protocol.DescribeGroupsResponse.Group.States.Stable, KafkaClient.Protocol.DescribeGroupsResponse.Group.States.Dead)] string state, - [Values("consumer", "unknown")] string protocolType, - [Values("good", "bad", "ugly")] string protocol) - { - var groups = new DescribeGroupsResponse.Group[count]; - for (var g = 0; g < count; g++) { - var members = new List(); - for (var m = 0; m < count; m++) { - var metadata = new byte[count*100]; - var assignment = new byte[count*10]; - _randomizer.NextBytes(metadata); - _randomizer.NextBytes(assignment); - - members.Add(new DescribeGroupsResponse.Member("member" + m, "client" + m, "host-" + m, new ByteTypeMetadata(protocol, new ArraySegment(metadata)), new ByteTypeAssignment(new ArraySegment(assignment)))); - } - groups[g] = new DescribeGroupsResponse.Group(errorCode, groupId + g, state, protocolType, protocol, members); - } - var response = new DescribeGroupsResponse(groups); - - response.AssertCanEncodeDecodeResponse(0, new ByteMembershipEncoder(protocolType)); - } - - [Test] - public void DescribeConsumerGroupsResponse( - [Values( - ErrorCode.None, - ErrorCode.OffsetMetadataTooLarge - )] ErrorCode errorCode, - [Values("test", "a groupId")] string groupId, - [Range(2, 3)] int count, - [Values(KafkaClient.Protocol.DescribeGroupsResponse.Group.States.Stable, Protocol.DescribeGroupsResponse.Group.States.AwaitingSync)] string state, - [Values("consumer")] string protocolType, - [Values("good", "bad", "ugly")] string protocol) - { - var encoder = new ConsumerEncoder(); - var groups = new DescribeGroupsResponse.Group[count]; - for (var g = 0; g < count; g++) { - var members = new List(); - for (var m = 0; m < count; m++) { - var memberId = "member" + m; - var userData = new byte[count*100]; - _randomizer.NextBytes(userData); - var metadata = new ConsumerProtocolMetadata(new []{ protocol, memberId, memberId }, protocol, new ArraySegment(userData), 0); - - var topics = new List(); - for (var t = 0; t < count; t++) { - topics.Add(new TopicPartition("topic foo" + t, t)); - } - var assignment = new ConsumerMemberAssignment(topics, new ArraySegment(userData), 0); - - members.Add(new DescribeGroupsResponse.Member(memberId, "client" + m, "host-" + m, metadata, assignment)); - } - groups[g] = new DescribeGroupsResponse.Group(errorCode, groupId + g, state, protocolType, protocol, members); - } - var response = new DescribeGroupsResponse(groups); - - response.AssertCanEncodeDecodeResponse(0, encoder); - } - - [Test] - public void ListGroupsRequest() - { - var request = new ListGroupsRequest(); - request.AssertCanEncodeDecodeRequest(0); - } - - [Test] - public void ListGroupsResponse( - [Values( - ErrorCode.None, - ErrorCode.OffsetMetadataTooLarge - )] ErrorCode errorCode, - [Values("test", "a groupId")] string groupId, - [Range(2, 3)] int count, - [Values("consumer")] string protocolType) - { - var groups = new ListGroupsResponse.Group[count]; - for (var g = 0; g < count; g++) { - groups[g] = new ListGroupsResponse.Group(groupId + g, protocolType); - } - var response = new ListGroupsResponse(errorCode, groups); - - response.AssertCanEncodeDecodeResponse(0); - } - - [Test] - public void SaslHandshakeRequest( - [Values("EXTERNAL", "ANONYMOUS", "PLAIN", "OTP", "SKEY", "CRAM-MD5", "DIGEST-MD5", "SCRAM", "NTLM", "GSSAPI", "OAUTHBEARER")] string mechanism) - { - var request = new SaslHandshakeRequest(mechanism); - - request.AssertCanEncodeDecodeRequest(0); - } - - [Test] - public void SaslHandshakeResponse( - [Values( - ErrorCode.None, - ErrorCode.OffsetMetadataTooLarge - )] ErrorCode errorCode, - [Range(1, 11)] int count) - { - var mechanisms = new[] { "EXTERNAL", "ANONYMOUS", "PLAIN", "OTP", "SKEY", "CRAM-MD5", "DIGEST-MD5", "SCRAM", "NTLM", "GSSAPI", "OAUTHBEARER" }; - var response = new SaslHandshakeResponse(errorCode, mechanisms.Take(count)); - - response.AssertCanEncodeDecodeResponse(0); - } - - [Test] - public void DeleteTopicsRequest( - [Values("test", "anotherNameForATopic")] string topicName, - [Range(2, 3)] int count, - [Values(0, 1, 20000)] int timeoutMilliseconds) - { - var topics = new string[count]; - for (var t = 0; t < count; t++) { - topics[t] = topicName + t; - } - var request = new DeleteTopicsRequest(topics, TimeSpan.FromMilliseconds(timeoutMilliseconds)); - - request.AssertCanEncodeDecodeRequest(0); - } - - [Test] - public void DeleteTopicsResponse( - [Values( - ErrorCode.None, - ErrorCode.NotController - )] ErrorCode errorCode, - [Values("test", "anotherNameForATopic")] string topicName, - [Range(1, 11)] int count) - { - var topics = new TopicsResponse.Topic[count]; - for (var t = 0; t < count; t++) { - topics[t] = new TopicsResponse.Topic(topicName + t, errorCode); - } - var response = new DeleteTopicsResponse(topics); - - response.AssertCanEncodeDecodeResponse(0); - } - - [Test] - public void CreateTopicsRequest( - [Values("testTopic")] string topicName, - [Values(1, 10)] int topicsPerRequest, - [Values(1, 5)] int partitionsPerTopic, - [Values(1, 3)] short replicationFactor, - [Values(0, 3)] int configCount, - [Values(0, 1, 20000)] int timeoutMilliseconds) - { - var topics = new List(); - for (var t = 0; t < topicsPerRequest; t++) { - var configs = new Dictionary(); - for (var c = 0; c < configCount; c++) { - configs["config-" + c] = Guid.NewGuid().ToString("N"); - } - if (configs.Count == 0 && _randomizer.NextBool()) { - configs = null; - } - topics.Add(new CreateTopicsRequest.Topic(topicName + t, partitionsPerTopic, replicationFactor, configs)); - } - var request = new CreateTopicsRequest(topics, TimeSpan.FromMilliseconds(timeoutMilliseconds)); - - request.AssertCanEncodeDecodeRequest(0); - } - - [Test] - public void CreateTopicsExplicitRequest( - [Values("testTopic")] string topicName, - [Values(1, 10)] int topicsPerRequest, - [Values(1, 5)] int partitionsPerTopic, - [Values(1, 3)] short replicationFactor, - [Values(0, 3)] int configCount, - [Values(0, 1, 20000)] int timeoutMilliseconds) - { - var topics = new List(); - for (var t = 0; t < topicsPerRequest; t++) { - var configs = new Dictionary(); - for (var c = 0; c < configCount; c++) { - configs["config-" + c] = Guid.NewGuid().ToString("N"); - } - if (configs.Count == 0 && _randomizer.NextBool()) { - configs = null; - } - - var assignments = new List(); - for (var partitionId = 0; partitionId < partitionsPerTopic; partitionId++) { - var replica = 0; - var replicas = _randomizer.Next(0, replicationFactor - 1).Repeat(() => replica++); - assignments.Add(new CreateTopicsRequest.ReplicaAssignment(partitionId, replicas)); - } - topics.Add(new CreateTopicsRequest.Topic(topicName + t, assignments, configs)); - } - var request = new CreateTopicsRequest(topics, TimeSpan.FromMilliseconds(timeoutMilliseconds)); - - request.AssertCanEncodeDecodeRequest(0); - } - - [Test] - public void CreateTopicsResponse( - [Values( - ErrorCode.None, - ErrorCode.InvalidTopic, - ErrorCode.InvalidPartitions - )] ErrorCode errorCode, - [Values("test", "anotherNameForATopic")] string topicName, - [Range(1, 11)] int count) - { - var topics = new TopicsResponse.Topic[count]; - for (var t = 0; t < count; t++) { - topics[t] = new TopicsResponse.Topic(topicName + t, errorCode); - } - var response = new CreateTopicsResponse(topics); - - response.AssertCanEncodeDecodeResponse(0); - } - - private IEnumerable GenerateMessages(int count, byte version, MessageCodec codec = MessageCodec.None) - { - var random = new Random(42); - var messages = new List(); - for (var m = 0; m < count; m++) { - var key = m > 0 ? new byte[8] : null; - var value = new byte[8*(m + 1)]; - if (key != null) { - random.NextBytes(key); - } - random.NextBytes(value); - - messages.Add(new Message(new ArraySegment(value), key != null ? new ArraySegment(key) : new ArraySegment(), (byte)codec, version: version, timestamp: version > 0 ? DateTimeOffset.UtcNow : (DateTimeOffset?)null)); - } - return messages; - } - } -} \ No newline at end of file diff --git a/src/KafkaClient.Tests/Unit/ProtocolMessageTests.cs b/src/KafkaClient.Tests/Unit/ProtocolMessageTests.cs index eaadc6b8..854f94bf 100644 --- a/src/KafkaClient.Tests/Unit/ProtocolMessageTests.cs +++ b/src/KafkaClient.Tests/Unit/ProtocolMessageTests.cs @@ -1,7 +1,5 @@ using System; using System.Linq; -using System.Text; -using KafkaClient.Common; using KafkaClient.Protocol; using NUnit.Framework; diff --git a/src/KafkaClient.Tests/Unit/ProtocolTests.cs b/src/KafkaClient.Tests/Unit/ProtocolTests.cs index fecc8ca2..e7f33a09 100644 --- a/src/KafkaClient.Tests/Unit/ProtocolTests.cs +++ b/src/KafkaClient.Tests/Unit/ProtocolTests.cs @@ -1,28 +1,884 @@ -using KafkaClient.Assignment; +using System; +using System.Collections.Generic; +using System.Linq; +using KafkaClient.Assignment; using KafkaClient.Common; using KafkaClient.Protocol; +using KafkaClient.Testing; using NUnit.Framework; +using NUnit.Framework.Internal; namespace KafkaClient.Tests.Unit { + /// + /// From http://kafka.apache.org/protocol.html#protocol_types + /// The protocol is built out of the following primitive types. + /// + /// Fixed Width Primitives: + /// int8, int16, int32, int64 - Signed integers with the given precision (in bits) stored in big endian order. + /// + /// Variable Length Primitives: + /// bytes, string - These types consist of a signed integer giving a length N followed by N bytes of content. + /// A length of -1 indicates null. string uses an int16 for its size, and bytes uses an int32. + /// + /// Arrays: + /// This is a notation for handling repeated structures. These will always be encoded as an int32 size containing + /// the length N followed by N repetitions of the structure which can itself be made up of other primitive types. + /// In the BNF grammars below we will show an array of a structure foo as [foo]. + /// + /// Message formats are from https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-CommonRequestandResponseStructure + /// + /// RequestOrResponse => Size (RequestMessage | ResponseMessage) + /// Size => int32 : The Size field gives the size of the subsequent request or response message in bytes. + /// The client can read requests by first reading this 4 byte size as an integer N, and + /// then reading and parsing the subsequent N bytes of the request. + /// + /// Request Header => api_key api_version correlation_id client_id + /// api_key => INT16 -- The id of the request type. + /// api_version => INT16 -- The version of the API. + /// correlation_id => INT32 -- A user-supplied integer value that will be passed back with the response. + /// client_id => NULLABLE_STRING -- A user specified identifier for the client making the request. + /// + /// Response Header => correlation_id + /// correlation_id => INT32 -- The user-supplied value passed in with the request + /// [TestFixture] public class ProtocolTests { + private readonly Randomizer _randomizer = new Randomizer(); + + [Test] + public void HeaderShouldCorrectPackByteLengths() + { + var result = KafkaEncoder.Encode(new RequestContext(123456789, clientId: "test"), new ApiVersionsRequest()); + + var withoutLength = new byte[result.Count - 4]; + Buffer.BlockCopy(result.Array, 4, withoutLength, 0, result.Count - 4); + Assert.That(withoutLength.Length, Is.EqualTo(14)); + Assert.That(withoutLength, Is.EqualTo(new byte[] { 0, 18, 0, 0, 7, 91, 205, 21, 0, 4, 116, 101, 115, 116 })); + } + + [Test] + public void ProduceRequest( + [Values(0, 1, 2)] short version, + [Values(0, 2, -1)] short acks, + [Values(0, 1000)] int timeoutMilliseconds, + [Values("testTopic")] string topic, + [Values(1, 10)] int topicsPerRequest, + [Values(1, 5)] int totalPartitions, + [Values(3)] int messagesPerSet, + [Values(MessageCodec.None, MessageCodec.Gzip, MessageCodec.Snappy)] MessageCodec codec) + { + var payloads = new List(); + for (var t = 0; t < topicsPerRequest; t++) { + var partition = 1 + t%totalPartitions; + payloads.Add(new ProduceRequest.Topic(topic + t, partition, GenerateMessages(messagesPerSet, (byte) (version >= 2 ? 1 : 0), codec), codec)); + } + var request = new ProduceRequest(payloads, TimeSpan.FromMilliseconds(timeoutMilliseconds), acks); + var requestWithUpdatedAttribute = new ProduceRequest(request.Topics.Select(t => new ProduceRequest.Topic(t.TopicName, t.PartitionId, + t.Messages.Select(m => m.Attribute == 0 ? m : new Message(m.Value, m.Key, 0, m.Offset, m.MessageVersion, m.Timestamp)))), + request.Timeout, request.Acks); + + var context = new RequestContext(16, version, "Test-Response"); + var data = KafkaEncoder.Encode(context, request); + var decoded = KafkaDecoder.Decode(data.Skip(4), context); + + // special case the comparison in the case of gzip because of the server semantics + if (!requestWithUpdatedAttribute.Equals(decoded)) { + var original = requestWithUpdatedAttribute.ToString(); + var final = decoded.ToString(); + Console.WriteLine($"Original\n{original}\nFinal\n{final}"); + Assert.That(final, Is.EqualTo(original)); + Assert.Fail("Not equal, although strings suggest they are?"); + } + } + + [Test] + public void ProduceResponse( + [Values(0, 1, 2)] short version, + [Values(-1, 0, 10000000)] long timestampMilliseconds, + [Values("testTopic")] string topicName, + [Values(1, 10)] int topicsPerRequest, + [Values(1, 5)] int totalPartitions, + [Values( + ErrorCode.None, + ErrorCode.CorruptMessage + )] ErrorCode errorCode, + [Values(0, 100000)] int throttleTime) + { + var topics = new List(); + for (var t = 0; t < topicsPerRequest; t++) { + topics.Add(new ProduceResponse.Topic(topicName + t, t % totalPartitions, errorCode, _randomizer.Next(), version >= 2 ? DateTimeOffset.FromUnixTimeMilliseconds(timestampMilliseconds) : (DateTimeOffset?)null)); + } + var response = new ProduceResponse(topics, version >= 1 ? TimeSpan.FromMilliseconds(throttleTime) : (TimeSpan?)null); + + response.AssertCanEncodeDecodeResponse(version); + } + + [Test] + public void FetchRequest( + [Values(0, 1, 2, 3)] short version, + [Values(0, 100)] int maxWaitMilliseconds, + [Values(0, 64000)] int minBytes, + [Values("testTopic")] string topic, + [Values(1, 10)] int topicsPerRequest, + [Values(1, 5)] int totalPartitions, + [Values(25600000)] int maxBytes) + { + var fetches = new List(); + for (var t = 0; t < topicsPerRequest; t++) { + fetches.Add(new FetchRequest.Topic(topic + t, t % totalPartitions, _randomizer.Next(0, int.MaxValue), maxBytes)); + } + var request = new FetchRequest(fetches, TimeSpan.FromMilliseconds(maxWaitMilliseconds), minBytes, version >= 3 ? maxBytes / _randomizer.Next(1, maxBytes) : 0); + request.AssertCanEncodeDecodeRequest(version); + } + + [Test] + public void FetchResponse( + [Values(0, 1, 2, 3)] short version, + [Values(0, 1234)] int throttleTime, + [Values("testTopic")] string topicName, + [Values(1, 10)] int topicsPerRequest, + [Values(1, 5)] int totalPartitions, + [Values(MessageCodec.None, MessageCodec.Gzip, MessageCodec.Snappy)] MessageCodec codec, + [Values( + ErrorCode.None, + ErrorCode.OffsetOutOfRange + )] ErrorCode errorCode, + [Values(3)] int messagesPerSet + ) + { + var topics = new List(); + for (var t = 0; t < topicsPerRequest; t++) { + var partitionId = t % totalPartitions; + var messages = GenerateMessages(messagesPerSet, (byte) (version >= 2 ? 1 : 0), codec); + topics.Add(new FetchResponse.Topic(topicName + t, partitionId, _randomizer.Next(), errorCode, messages)); + } + var response = new FetchResponse(topics, version >= 1 ? TimeSpan.FromMilliseconds(throttleTime) : (TimeSpan?)null); + var responseWithUpdatedAttribute = new FetchResponse(response.Topics.Select(t => new FetchResponse.Topic(t.TopicName, t.PartitionId, t.HighWaterMark, t.ErrorCode, + t.Messages.Select(m => m.Attribute == 0 ? m : new Message(m.Value, m.Key, 0, m.Offset, m.MessageVersion, m.Timestamp)))), + response.ThrottleTime); + + var context = new RequestContext(16, version, "Test-Response"); + var data = KafkaDecoder.EncodeResponseBytes(context, response); + var decoded = KafkaEncoder.Decode(context, ApiKey.Fetch, data.Skip(KafkaEncoder.ResponseHeaderSize)); + + // special case the comparison in the case of gzip because of the server semantics + if (!responseWithUpdatedAttribute.Equals(decoded)) { + var original = responseWithUpdatedAttribute.ToString(); + var final = decoded.ToString(); + Console.WriteLine($"Original\n{original}\nFinal\n{final}"); + Assert.That(final, Is.EqualTo(original)); + Assert.Fail("Not equal, although strings suggest they are?"); + } + } + + [Test] + public void OffsetsRequest( + [Values(0, 1)] short version, + [Values("testTopic")] string topic, + [Values(1, 10)] int topicsPerRequest, + [Values(1, 5)] int totalPartitions, + [Values(-2, -1, 123456, 10000000)] long time, + [Values(1, 10)] int maxOffsets) + { + var topics = new List(); + for (var t = 0; t < topicsPerRequest; t++) { + var offset = new OffsetRequest.Topic(topic + t, t % totalPartitions, time, version == 0 ? maxOffsets : 1); + topics.Add(offset); + } + var request = new OffsetRequest(topics); + + request.AssertCanEncodeDecodeRequest(version); + } + + [Test] + public void OffsetsResponse( + [Values(0, 1)] short version, + [Values("testTopic")] string topicName, + [Values(1, 10)] int topicsPerRequest, + [Values(5)] int totalPartitions, + [Values( + ErrorCode.UnknownTopicOrPartition, + ErrorCode.NotLeaderForPartition, + ErrorCode.Unknown + )] ErrorCode errorCode, + [Values(1, 5)] int offsetsPerPartition) + { + var topics = new List(); + for (var t = 0; t < topicsPerRequest; t++) { + var partitionId = t % totalPartitions; + for (var o = 0; o < offsetsPerPartition; o++) { + topics.Add(new OffsetResponse.Topic(topicName + t, partitionId, errorCode, _randomizer.Next(-1, int.MaxValue), version >= 1 ? (DateTimeOffset?)DateTimeOffset.UtcNow : null)); + } + } + var response = new OffsetResponse(topics); + + response.AssertCanEncodeDecodeResponse(version); + } + + [Test] + public void MetadataRequest( + [Values("testTopic")] string topic, + [Values(0, 1, 10)] int topicsPerRequest) + { + var topics = new List(); + for (var t = 0; t < topicsPerRequest; t++) { + topics.Add(topic + t); + } + var request = new MetadataRequest(topics); + + request.AssertCanEncodeDecodeRequest(0); + } + + [Test] + public void MetadataResponse( + [Values(0, 1, 2)] short version, + [Values(1, 15)] int brokersPerRequest, + [Values("testTopic")] string topicName, + [Values(1, 10)] int topicsPerRequest, + [Values(1, 5)] int partitionsPerTopic, + [Values( + ErrorCode.None, + ErrorCode.UnknownTopicOrPartition + )] ErrorCode errorCode) + { + var brokers = new List(); + for (var b = 0; b < brokersPerRequest; b++) { + string rack = null; + if (version >= 1) { + rack = "Rack" + b; + } + brokers.Add(new KafkaClient.Protocol.Broker(b, "broker-" + b, 9092 + b, rack)); + } + var topics = new List(); + for (var t = 0; t < topicsPerRequest; t++) { + var partitions = new List(); + for (var partitionId = 0; partitionId < partitionsPerTopic; partitionId++) { + var leader = _randomizer.Next(0, brokersPerRequest - 1); + var replica = 0; + var replicas = _randomizer.Next(0, brokersPerRequest - 1).Repeat(() => replica++); + var isr = 0; + var isrs = _randomizer.Next(0, replica).Repeat(() => isr++); + partitions.Add(new MetadataResponse.Partition(partitionId, leader, errorCode, replicas, isrs)); + } + topics.Add(new MetadataResponse.Topic(topicName + t, errorCode, partitions, version >= 1 ? topicsPerRequest%2 == 0 : (bool?)null)); + } + var response = new MetadataResponse(brokers, topics, version >= 1 ? brokersPerRequest : (int?)null, version >= 2 ? $"cluster-{version}" : null); + + response.AssertCanEncodeDecodeResponse(version); + } + + [Test] + public void OffsetCommitRequest( + [Values(0, 1, 2)] short version, + [Values("group1", "group2")] string groupId, + [Values(0, 5)] int generation, + [Values(-1, 20000)] int retentionTime, + [Values("testTopic")] string topic, + [Values(1, 10)] int topicsPerRequest, + [Values(5)] int maxPartitions, + [Values(10)] int maxOffsets, + [Values(null, "something useful for the client")] string metadata) + { + var timestamp = retentionTime; + var offsetCommits = new List(); + for (var t = 0; t < topicsPerRequest; t++) { + offsetCommits.Add(new OffsetCommitRequest.Topic( + topic + t, + t%maxPartitions, + _randomizer.Next(0, int.MaxValue), + metadata, + version == 1 ? timestamp : (long?)null)); + } + var request = new OffsetCommitRequest( + groupId, + offsetCommits, + version >= 1 ? "member" + generation : null, + version >= 1 ? generation : 0, + version >= 2 && retentionTime >= 0 ? (TimeSpan?) TimeSpan.FromMilliseconds(retentionTime) : null); + + request.AssertCanEncodeDecodeRequest(version); + } + + [Test] + public void OffsetCommitResponse( + [Values("testTopic")] string topicName, + [Values(1, 10)] int topicsPerRequest, + [Values(1, 5)] int partitionsPerTopic, + [Values( + ErrorCode.None, + ErrorCode.OffsetMetadataTooLarge + )] ErrorCode errorCode) + { + var topics = new List(); + for (var t = 0; t < topicsPerRequest; t++) { + for (var partitionId = 0; partitionId < partitionsPerTopic; partitionId++) { + topics.Add(new TopicResponse(topicName + t, partitionId, errorCode)); + } + } + var response = new OffsetCommitResponse(topics); + + response.AssertCanEncodeDecodeResponse(0); + } + + [Test] + public void OffsetFetchRequest( + [Values("group1", "group2")] string groupId, + [Values("testTopic")] string topic, + [Values(1, 10)] int topicsPerRequest, + [Values(5)] int maxPartitions) + { + var topics = new List(); + for (var t = 0; t < topicsPerRequest; t++) { + topics.Add(new TopicPartition(topic + t, t % maxPartitions)); + } + var request = new OffsetFetchRequest(groupId, topics); + + request.AssertCanEncodeDecodeRequest(0); + } + + [Test] + public void OffsetFetchResponse( + [Values("testTopic")] string topicName, + [Values(1, 10)] int topicsPerRequest, + [Values(1, 5)] int partitionsPerTopic, + [Values( + ErrorCode.None, + ErrorCode.UnknownTopicOrPartition, + ErrorCode.GroupLoadInProgress, + ErrorCode.NotCoordinatorForGroup, + ErrorCode.IllegalGeneration, + ErrorCode.UnknownMemberId, + ErrorCode.TopicAuthorizationFailed, + ErrorCode.GroupAuthorizationFailed + )] ErrorCode errorCode) + { + var topics = new List(); + for (var t = 0; t < topicsPerRequest; t++) { + for (var partitionId = 0; partitionId < partitionsPerTopic; partitionId++) { + var offset = (long)_randomizer.Next(int.MinValue, int.MaxValue); + topics.Add(new OffsetFetchResponse.Topic(topicName + t, partitionId, errorCode, offset, offset >= 0 ? topicName : string.Empty)); + } + } + var response = new OffsetFetchResponse(topics); + + response.AssertCanEncodeDecodeResponse(0); + } + + [Test] + public void GroupCoordinatorRequest([Values("group1", "group2")] string groupId) + { + var request = new GroupCoordinatorRequest(groupId); + request.AssertCanEncodeDecodeRequest(0); + } + [Test] - public void MetadataResponseShouldDecode() + public void GroupCoordinatorResponse( + [Values( + ErrorCode.None, + ErrorCode.GroupCoordinatorNotAvailable, + ErrorCode.GroupAuthorizationFailed + )] ErrorCode errorCode, + [Values(0, 1)] int coordinatorId + ) { - var response = KafkaEncoder.Decode(new RequestContext(1), ApiKey.Metadata, MessageHelper.CreateMetadataResponse(new RequestContext(1), "Test").Skip(KafkaEncoder.ResponseHeaderSize)); + var response = new GroupCoordinatorResponse(errorCode, coordinatorId, "broker-" + coordinatorId, 9092 + coordinatorId); - Assert.That(response.Topics[0].TopicName, Is.EqualTo("Test")); + response.AssertCanEncodeDecodeResponse(0); } [Test] - public void InterfacesAreFormattedWithinProtocol() + public void ApiVersionsRequest() { - var request = new SyncGroupRequest("group", 5, "member", new []{ new SyncGroupRequest.GroupAssignment("member", new ConsumerMemberAssignment(new []{ new TopicPartition("topic", 0), new TopicPartition("topic", 1) }))}); - var formatted = request.ToString(); - Assert.That(formatted.Contains("TopicName:topic")); - Assert.That(formatted.Contains("PartitionId:1")); + var request = new ApiVersionsRequest(); + request.AssertCanEncodeDecodeRequest(0); + } + + [Test] + public void ApiVersionsResponse( + [Values( + ErrorCode.None, + ErrorCode.BrokerNotAvailable + )] ErrorCode errorCode + ) + { + var supported = new List(); + for (short apiKey = 0; apiKey <= 18; apiKey++) { + supported.Add(new ApiVersionsResponse.VersionSupport((ApiKey)apiKey, 0, (short)_randomizer.Next(0, 2))); + } + var response = new ApiVersionsResponse(errorCode, supported); + + response.AssertCanEncodeDecodeResponse(0); + } + + [Test] + public void JoinGroupRequest( + [Values(0, 1)] short version, + [Values("test", "a groupId")] string groupId, + [Values(1, 20000)] int sessionTimeout, + [Values("", "an existing member")] string memberId, + [Values("consumer", "other")] string protocolType, + [Values(1, 10)] int protocolsPerRequest) + { + var protocols = new List(); + for (var p = 0; p < protocolsPerRequest; p++) { + var bytes = new byte[protocolsPerRequest*100]; + _randomizer.NextBytes(bytes); + protocols.Add(new JoinGroupRequest.GroupProtocol(new ByteTypeMetadata("known", new ArraySegment(bytes)))); + } + var request = new JoinGroupRequest(groupId, TimeSpan.FromMilliseconds(sessionTimeout), memberId, protocolType, protocols, version >= 1 ? (TimeSpan?)TimeSpan.FromMilliseconds(sessionTimeout * 2) : null); + + request.AssertCanEncodeDecodeRequest(version, new ByteMembershipEncoder(protocolType)); + } + + [Test] + public void JoinGroupResponse( + [Values( + ErrorCode.None, + ErrorCode.OffsetMetadataTooLarge + )] ErrorCode errorCode, + [Values(0, 1, 20000)] int generationId, + [Values("consumer", "other")] string protocol, + [Values("test", "a groupId")] string leaderId, + [Values("", "an existing member")] string memberId, + [Values(1, 10)] int memberCount) + { + var members = new List(); + for (var m = 0; m < memberCount; m++) { + var bytes = new byte[memberCount*100]; + _randomizer.NextBytes(bytes); + members.Add(new JoinGroupResponse.Member(memberId + m, new ByteTypeMetadata("known", new ArraySegment(bytes)))); + } + var response = new JoinGroupResponse(errorCode, generationId, "known", leaderId, memberId, members); + + response.AssertCanEncodeDecodeResponse(0, new ByteMembershipEncoder(protocol)); + } + + [Test] + public void JoinConsumerGroupRequest( + [Values("test", "a groupId")] string groupId, + [Values(1, 20000)] int sessionTimeout, + [Values("", "an existing member")] string memberId, + [Values("mine", "yours")] string protocol, + [Values(1, 10)] int protocolsPerRequest) + { + var encoder = new ConsumerEncoder(); + var protocols = new List(); + for (var p = 0; p < protocolsPerRequest; p++) { + var userData = new byte[protocolsPerRequest*100]; + _randomizer.NextBytes(userData); + var metadata = new ConsumerProtocolMetadata(new []{ groupId, memberId, protocol }, protocol + p, new ArraySegment(userData), 0); + protocols.Add(new JoinGroupRequest.GroupProtocol(metadata)); + } + var request = new JoinGroupRequest(groupId, TimeSpan.FromMilliseconds(sessionTimeout), memberId, ConsumerEncoder.Protocol, protocols); + + request.AssertCanEncodeDecodeRequest(0, encoder); + } + + [Test] + public void JoinConsumerGroupResponse( + [Values( + ErrorCode.None, + ErrorCode.OffsetMetadataTooLarge + )] ErrorCode errorCode, + [Values(0, 1, 20000)] int generationId, + [Values("consumer")] string protocol, + [Values("test", "a groupId")] string leaderId, + [Values("", "an existing member")] string memberId, + [Values(1, 10)] int memberCount) + { + var encoder = new ConsumerEncoder(); + var members = new List(); + for (var m = 0; m < memberCount; m++) { + var userData = new byte[memberCount*100]; + _randomizer.NextBytes(userData); + var metadata = new ConsumerProtocolMetadata(new []{ protocol, memberId, leaderId }, protocol, new ArraySegment(userData), 0); + members.Add(new JoinGroupResponse.Member(memberId + m, metadata)); + } + var response = new JoinGroupResponse(errorCode, generationId, protocol, leaderId, memberId, members); + + response.AssertCanEncodeDecodeResponse(0, encoder); + } + + [Test] + public void HeartbeatRequest( + [Values("test", "a groupId")] string groupId, + [Values(0, 1, 20000)] int generationId, + [Values("", "an existing member")] string memberId) + { + var request = new HeartbeatRequest(groupId, generationId, memberId); + + request.AssertCanEncodeDecodeRequest(0); + } + + [Test] + public void HeartbeatResponse( + [Values( + ErrorCode.None, + ErrorCode.OffsetMetadataTooLarge + )] ErrorCode errorCode) + { + var response = new HeartbeatResponse(errorCode); + + response.AssertCanEncodeDecodeResponse(0); + } + + [Test] + public void LeaveGroupRequest( + [Values("test", "a groupId")] string groupId, + [Values("", "an existing member")] string memberId) + { + var request = new LeaveGroupRequest(groupId, memberId); + + request.AssertCanEncodeDecodeRequest(0); + } + + [Test] + public void LeaveGroupResponse( + [Values( + ErrorCode.None, + ErrorCode.OffsetMetadataTooLarge + )] ErrorCode errorCode) + { + var response = new LeaveGroupResponse(errorCode); + + response.AssertCanEncodeDecodeResponse(0); + } + + [Test] + public void SyncGroupRequest( + [Values("test", "a groupId")] string groupId, + [Values(0, 1, 20000)] int generationId, + [Values("", "an existing member")] string memberId, + [Values("consumer", "other")] string protocolType, + [Values(1, 10)] int assignmentsPerRequest) + { + var assignments = new List(); + for (var a = 0; a < assignmentsPerRequest; a++) { + var bytes = new byte[assignmentsPerRequest*100]; + _randomizer.NextBytes(bytes); + assignments.Add(new SyncGroupRequest.GroupAssignment(protocolType + a, new ByteTypeAssignment(new ArraySegment(bytes)))); + } + var request = new SyncGroupRequest(groupId, generationId, memberId, assignments); + + request.AssertCanEncodeDecodeRequest(0, new ByteMembershipEncoder(protocolType)); + } + + [Test] + public void SyncGroupResponse( + [Values( + ErrorCode.None, + ErrorCode.OffsetMetadataTooLarge + )] ErrorCode errorCode) + { + var bytes = new byte[1000]; + _randomizer.NextBytes(bytes); + var response = new SyncGroupResponse(errorCode, new ByteTypeAssignment(new ArraySegment(bytes))); + + response.AssertCanEncodeDecodeResponse(0, new ByteMembershipEncoder("protocolType")); + } + + [Test] + public void SyncConsumerGroupRequest( + [Values("test", "a groupId")] string groupId, + [Values(0, 1, 20000)] int generationId, + [Values("", "an existing member")] string memberId, + [Values("consumer")] string protocolType, + [Values(1, 10)] int assignmentsPerRequest) + { + var encoder = new ConsumerEncoder(); + var assignments = new List(); + for (var a = 0; a < assignmentsPerRequest; a++) { + var topics = new List(); + for (var t = 0; t < assignmentsPerRequest; t++) { + topics.Add(new TopicPartition(groupId + t, t)); + } + var userData = new byte[assignmentsPerRequest*100]; + _randomizer.NextBytes(userData); + var assignment = new ConsumerMemberAssignment(topics, new ArraySegment(userData), 0); + assignments.Add(new SyncGroupRequest.GroupAssignment(protocolType + a, assignment)); + } + var request = new SyncGroupRequest(groupId, generationId, memberId, assignments); + + request.AssertCanEncodeDecodeRequest(0, encoder); + } + + [Test] + public void SyncConsumerGroupResponse( + [Values( + ErrorCode.None, + ErrorCode.OffsetMetadataTooLarge + )] ErrorCode errorCode, + [Values(1, 10)] int memberCount) + { + var encoder = new ConsumerEncoder(); + var topics = new List(); + for (var t = 0; t < memberCount; t++) { + topics.Add(new TopicPartition("topic foo" + t, t)); + } + var userData = new byte[memberCount*100]; + _randomizer.NextBytes(userData); + var assignment = new ConsumerMemberAssignment(topics, new ArraySegment(userData), 0); + var response = new SyncGroupResponse(errorCode, assignment); + + response.AssertCanEncodeDecodeResponse(0, encoder); + } + + [Test] + public void DescribeGroupsRequest( + [Values("test", "a groupId")] string groupId, + [Range(1, 10)] int count) + { + var groups = new string[count]; + for (var g = 0; g < count; g++) { + groups[g] = groupId + g; + } + var request = new DescribeGroupsRequest(groups); + + request.AssertCanEncodeDecodeRequest(0); + } + + [Test] + public void DescribeGroupsResponse( + [Values( + ErrorCode.None, + ErrorCode.OffsetMetadataTooLarge + )] ErrorCode errorCode, + [Values("test", "a groupId")] string groupId, + [Range(2, 3)] int count, + [Values(KafkaClient.Protocol.DescribeGroupsResponse.Group.States.Stable, KafkaClient.Protocol.DescribeGroupsResponse.Group.States.Dead)] string state, + [Values("consumer", "unknown")] string protocolType, + [Values("good", "bad", "ugly")] string protocol) + { + var groups = new DescribeGroupsResponse.Group[count]; + for (var g = 0; g < count; g++) { + var members = new List(); + for (var m = 0; m < count; m++) { + var metadata = new byte[count*100]; + var assignment = new byte[count*10]; + _randomizer.NextBytes(metadata); + _randomizer.NextBytes(assignment); + + members.Add(new DescribeGroupsResponse.Member("member" + m, "client" + m, "host-" + m, new ByteTypeMetadata(protocol, new ArraySegment(metadata)), new ByteTypeAssignment(new ArraySegment(assignment)))); + } + groups[g] = new DescribeGroupsResponse.Group(errorCode, groupId + g, state, protocolType, protocol, members); + } + var response = new DescribeGroupsResponse(groups); + + response.AssertCanEncodeDecodeResponse(0, new ByteMembershipEncoder(protocolType)); + } + + [Test] + public void DescribeConsumerGroupsResponse( + [Values( + ErrorCode.None, + ErrorCode.OffsetMetadataTooLarge + )] ErrorCode errorCode, + [Values("test", "a groupId")] string groupId, + [Range(2, 3)] int count, + [Values(KafkaClient.Protocol.DescribeGroupsResponse.Group.States.Stable, Protocol.DescribeGroupsResponse.Group.States.AwaitingSync)] string state, + [Values("consumer")] string protocolType, + [Values("good", "bad", "ugly")] string protocol) + { + var encoder = new ConsumerEncoder(); + var groups = new DescribeGroupsResponse.Group[count]; + for (var g = 0; g < count; g++) { + var members = new List(); + for (var m = 0; m < count; m++) { + var memberId = "member" + m; + var userData = new byte[count*100]; + _randomizer.NextBytes(userData); + var metadata = new ConsumerProtocolMetadata(new []{ protocol, memberId, memberId }, protocol, new ArraySegment(userData), 0); + + var topics = new List(); + for (var t = 0; t < count; t++) { + topics.Add(new TopicPartition("topic foo" + t, t)); + } + var assignment = new ConsumerMemberAssignment(topics, new ArraySegment(userData), 0); + + members.Add(new DescribeGroupsResponse.Member(memberId, "client" + m, "host-" + m, metadata, assignment)); + } + groups[g] = new DescribeGroupsResponse.Group(errorCode, groupId + g, state, protocolType, protocol, members); + } + var response = new DescribeGroupsResponse(groups); + + response.AssertCanEncodeDecodeResponse(0, encoder); + } + + [Test] + public void ListGroupsRequest() + { + var request = new ListGroupsRequest(); + request.AssertCanEncodeDecodeRequest(0); + } + + [Test] + public void ListGroupsResponse( + [Values( + ErrorCode.None, + ErrorCode.OffsetMetadataTooLarge + )] ErrorCode errorCode, + [Values("test", "a groupId")] string groupId, + [Range(2, 3)] int count, + [Values("consumer")] string protocolType) + { + var groups = new ListGroupsResponse.Group[count]; + for (var g = 0; g < count; g++) { + groups[g] = new ListGroupsResponse.Group(groupId + g, protocolType); + } + var response = new ListGroupsResponse(errorCode, groups); + + response.AssertCanEncodeDecodeResponse(0); + } + + [Test] + public void SaslHandshakeRequest( + [Values("EXTERNAL", "ANONYMOUS", "PLAIN", "OTP", "SKEY", "CRAM-MD5", "DIGEST-MD5", "SCRAM", "NTLM", "GSSAPI", "OAUTHBEARER")] string mechanism) + { + var request = new SaslHandshakeRequest(mechanism); + + request.AssertCanEncodeDecodeRequest(0); + } + + [Test] + public void SaslHandshakeResponse( + [Values( + ErrorCode.None, + ErrorCode.OffsetMetadataTooLarge + )] ErrorCode errorCode, + [Range(1, 11)] int count) + { + var mechanisms = new[] { "EXTERNAL", "ANONYMOUS", "PLAIN", "OTP", "SKEY", "CRAM-MD5", "DIGEST-MD5", "SCRAM", "NTLM", "GSSAPI", "OAUTHBEARER" }; + var response = new SaslHandshakeResponse(errorCode, mechanisms.Take(count)); + + response.AssertCanEncodeDecodeResponse(0); + } + + [Test] + public void DeleteTopicsRequest( + [Values("test", "anotherNameForATopic")] string topicName, + [Range(2, 3)] int count, + [Values(0, 1, 20000)] int timeoutMilliseconds) + { + var topics = new string[count]; + for (var t = 0; t < count; t++) { + topics[t] = topicName + t; + } + var request = new DeleteTopicsRequest(topics, TimeSpan.FromMilliseconds(timeoutMilliseconds)); + + request.AssertCanEncodeDecodeRequest(0); + } + + [Test] + public void DeleteTopicsResponse( + [Values( + ErrorCode.None, + ErrorCode.NotController + )] ErrorCode errorCode, + [Values("test", "anotherNameForATopic")] string topicName, + [Range(1, 11)] int count) + { + var topics = new TopicsResponse.Topic[count]; + for (var t = 0; t < count; t++) { + topics[t] = new TopicsResponse.Topic(topicName + t, errorCode); + } + var response = new DeleteTopicsResponse(topics); + + response.AssertCanEncodeDecodeResponse(0); + } + + [Test] + public void CreateTopicsRequest( + [Values("testTopic")] string topicName, + [Values(1, 10)] int topicsPerRequest, + [Values(1, 5)] int partitionsPerTopic, + [Values(1, 3)] short replicationFactor, + [Values(0, 3)] int configCount, + [Values(0, 1, 20000)] int timeoutMilliseconds) + { + var topics = new List(); + for (var t = 0; t < topicsPerRequest; t++) { + var configs = new Dictionary(); + for (var c = 0; c < configCount; c++) { + configs["config-" + c] = Guid.NewGuid().ToString("N"); + } + if (configs.Count == 0 && _randomizer.NextBool()) { + configs = null; + } + topics.Add(new CreateTopicsRequest.Topic(topicName + t, partitionsPerTopic, replicationFactor, configs)); + } + var request = new CreateTopicsRequest(topics, TimeSpan.FromMilliseconds(timeoutMilliseconds)); + + request.AssertCanEncodeDecodeRequest(0); + } + + [Test] + public void CreateTopicsExplicitRequest( + [Values("testTopic")] string topicName, + [Values(1, 10)] int topicsPerRequest, + [Values(1, 5)] int partitionsPerTopic, + [Values(1, 3)] short replicationFactor, + [Values(0, 3)] int configCount, + [Values(0, 1, 20000)] int timeoutMilliseconds) + { + var topics = new List(); + for (var t = 0; t < topicsPerRequest; t++) { + var configs = new Dictionary(); + for (var c = 0; c < configCount; c++) { + configs["config-" + c] = Guid.NewGuid().ToString("N"); + } + if (configs.Count == 0 && _randomizer.NextBool()) { + configs = null; + } + + var assignments = new List(); + for (var partitionId = 0; partitionId < partitionsPerTopic; partitionId++) { + var replica = 0; + var replicas = _randomizer.Next(0, replicationFactor - 1).Repeat(() => replica++); + assignments.Add(new CreateTopicsRequest.ReplicaAssignment(partitionId, replicas)); + } + topics.Add(new CreateTopicsRequest.Topic(topicName + t, assignments, configs)); + } + var request = new CreateTopicsRequest(topics, TimeSpan.FromMilliseconds(timeoutMilliseconds)); + + request.AssertCanEncodeDecodeRequest(0); + } + + [Test] + public void CreateTopicsResponse( + [Values( + ErrorCode.None, + ErrorCode.InvalidTopic, + ErrorCode.InvalidPartitions + )] ErrorCode errorCode, + [Values("test", "anotherNameForATopic")] string topicName, + [Range(1, 11)] int count) + { + var topics = new TopicsResponse.Topic[count]; + for (var t = 0; t < count; t++) { + topics[t] = new TopicsResponse.Topic(topicName + t, errorCode); + } + var response = new CreateTopicsResponse(topics); + + response.AssertCanEncodeDecodeResponse(0); + } + + private IEnumerable GenerateMessages(int count, byte version, MessageCodec codec = MessageCodec.None) + { + var random = new Random(42); + var messages = new List(); + for (var m = 0; m < count; m++) { + var key = m > 0 ? new byte[8] : null; + var value = new byte[8*(m + 1)]; + if (key != null) { + random.NextBytes(key); + } + random.NextBytes(value); + + messages.Add(new Message(new ArraySegment(value), key != null ? new ArraySegment(key) : new ArraySegment(), (byte)codec, version: version, timestamp: version > 0 ? DateTimeOffset.UtcNow : (DateTimeOffset?)null)); + } + return messages; } } } \ No newline at end of file diff --git a/src/KafkaClient.Tests/Unit/RetryTests.cs b/src/KafkaClient.Tests/Unit/RetryTests.cs index 82030ef3..e63616ab 100644 --- a/src/KafkaClient.Tests/Unit/RetryTests.cs +++ b/src/KafkaClient.Tests/Unit/RetryTests.cs @@ -1,4 +1,7 @@ using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; using KafkaClient.Common; using NUnit.Framework; @@ -7,6 +10,25 @@ namespace KafkaClient.Tests.Unit [TestFixture] public class RetryTests { + [Test] + public async Task NoDelayBeforeFirstAttempt() + { + var timer = new Stopwatch(); + timer.Start(); + var result = await Retry.WithBackoff(5, minimumDelay: TimeSpan.FromSeconds(1), maximumDelay: TimeSpan.FromSeconds(1)) + .TryAsync( + (attempt, s) => + { + timer.Stop(); + return Task.FromResult(new RetryAttempt(timer.ElapsedMilliseconds)); + }, + null, + null, + null, + CancellationToken.None); + Assert.That(result, Is.LessThan(1000)); + } + [Test] public void RetryNoneDoesNotRetry() { diff --git a/src/KafkaClient.Tests/Unit/RouterSendAsyncTests.cs b/src/KafkaClient.Tests/Unit/RouterSendAsyncTests.cs index 940dfe61..4738806c 100644 --- a/src/KafkaClient.Tests/Unit/RouterSendAsyncTests.cs +++ b/src/KafkaClient.Tests/Unit/RouterSendAsyncTests.cs @@ -92,7 +92,7 @@ public async Task SendProtocolRequestShouldNotTryToRefreshMataDataIfCanNotRecove public async Task ShouldUpdateMetadataOnce() { var routerProxy = new FakeRouter(); - var cacheExpiration = TimeSpan.FromMilliseconds(10); + var cacheExpiration = TimeSpan.FromMilliseconds(100); var router = routerProxy.Create(cacheExpiration); routerProxy.Connection1.Add(ApiKey.Fetch, ShouldReturnValidMessage); diff --git a/src/KafkaClient.Tests/Unit/TransportTests.cs b/src/KafkaClient.Tests/Unit/TransportTests.cs index d5b318d2..db54aafe 100644 --- a/src/KafkaClient.Tests/Unit/TransportTests.cs +++ b/src/KafkaClient.Tests/Unit/TransportTests.cs @@ -24,7 +24,7 @@ public async Task ShouldAttemptMultipleTimesWhenConnectionFails() { var count = 0; var config = new ConnectionConfiguration(onConnecting: (e, a, _) => Interlocked.Increment(ref count)); - using (var transport = new SocketTransport(await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log), config, TestConfig.Log)) { + using (var transport = new SocketTransport(TestConfig.ServerEndpoint(), config, TestConfig.Log)) { var task = transport.ConnectAsync(CancellationToken.None); await TaskTest.WaitFor(() => count > 1, 10000); Assert.That(count, Is.GreaterThan(1)); @@ -38,25 +38,26 @@ public async Task ShouldAttemptMultipleTimesWhenConnectionFails() [Test] public async Task ShouldDisposeWithoutExceptionThrown() { - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) { - var conn = new SocketTransport(endpoint, TestConfig.Options.ConnectionConfiguration, TestConfig.Log); - await Task.WhenAny(server.ClientConnected, Task.Delay(TimeSpan.FromSeconds(3))); - using (conn) { } + var transport = new SocketTransport(endpoint, TestConfig.Options.ConnectionConfiguration, TestConfig.Log); + server.OnConnected = () => transport.Dispose(); + await Task.WhenAny(transport.ConnectAsync(CancellationToken.None), Task.Delay(TimeSpan.FromSeconds(3))); + transport.Dispose(); } } [Test] public async Task ShouldDisposeEvenWhilePollingToReconnect() { - var connectionAttempt = 0; - var config = new ConnectionConfiguration(onConnecting: (e, a, _) => connectionAttempt = a); - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var connectionAttempt = -1; + var config = new ConnectionConfiguration(Retry.AtMost(5), onConnecting: (e, a, _) => connectionAttempt = a); + var endpoint = TestConfig.ServerEndpoint(); using (var transport = new SocketTransport(endpoint, config, TestConfig.Log)) { var taskResult = transport.ConnectAsync(CancellationToken.None); - await TaskTest.WaitFor(() => connectionAttempt > 1); + await TaskTest.WaitFor(() => connectionAttempt >= 0); transport.Dispose(); await Task.WhenAny(taskResult, Task.Delay(1000)).ConfigureAwait(false); @@ -71,7 +72,7 @@ public async Task ShouldDisposeEvenWhileAwaitingReadAndThrowException() { int readSize = 0; var config = new ConnectionConfiguration(onReading: (e, size) => readSize = size); - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (new TcpServer(endpoint.Ip.Port, TestConfig.Log)) { var transport = new SocketTransport(endpoint, config, TestConfig.Log); try { @@ -106,7 +107,7 @@ public async Task ReadShouldBlockUntilAllBytesRequestedAreReceived() var config = new ConnectionConfiguration( onReadBytes: (e, attempted, actual, elapsed) => Interlocked.Add(ref bytesReceived, actual), onRead: (e, read, elapsed) => Interlocked.Increment(ref readCompleted)); - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) { using (var transport = new SocketTransport(endpoint, config, TestConfig.Log)) { await transport.ConnectAsync(CancellationToken.None); @@ -139,7 +140,7 @@ public async Task ReadShouldBlockUntilAllBytesRequestedAreReceived() [Test] public async Task ReadShouldBeAbleToReceiveMoreThanOnce() { - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) { using (var transport = new SocketTransport(endpoint, TestConfig.Options.ConnectionConfiguration, TestConfig.Log)) { const int firstMessage = 99; @@ -165,7 +166,7 @@ public async Task ReadShouldBeAbleToReceiveMoreThanOnce() [Test] public async Task ReadShouldBeAbleToReceiveMoreThanOnceAsyncronously() { - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) { using (var transport = new SocketTransport(endpoint, TestConfig.Options.ConnectionConfiguration, TestConfig.Log)) { const int firstMessage = 99; @@ -193,7 +194,7 @@ public async Task ReadShouldBeAbleToReceiveMoreThanOnceAsyncronously() [Test] public async Task ReadShouldNotLoseDataFromStreamOverMultipleReads() { - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) { using (var transport = new SocketTransport(endpoint, TestConfig.Options.ConnectionConfiguration, TestConfig.Log)) { const int firstMessage = 99; @@ -227,7 +228,7 @@ public async Task ReadShouldNotLoseDataFromStreamOverMultipleReads() public async Task ReadShouldThrowServerDisconnectedExceptionWhenDisconnected() { var disconnectedCount = 0; - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log) { OnDisconnected = () => Interlocked.Increment(ref disconnectedCount) }) { @@ -256,7 +257,7 @@ public async Task WhenNoConnectionThrowSocketExceptionAfterMaxRetry() { var reconnectionAttempt = 0; const int maxAttempts = 3; - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); var config = new ConnectionConfiguration( Retry.AtMost(maxAttempts), onConnecting: (e, attempt, elapsed) => Interlocked.Increment(ref reconnectionAttempt) @@ -275,7 +276,7 @@ public async Task WhenNoConnectionThrowSocketExceptionAfterMaxRetry() [Test] public async Task ReadShouldStackReadRequestsAndReturnOneAtATime() { - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) { var messages = new[] { "test1", "test2", "test3", "test4" }; @@ -308,14 +309,16 @@ public async Task ReadShouldStackReadRequestsAndReturnOneAtATime() [Test] public async Task WriteAsyncShouldSendData() { - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) using (var transport = new SocketTransport(endpoint, TestConfig.Options.ConnectionConfiguration, TestConfig.Log)) { const int testData = 99; int result = 0; - server.OnBytesReceived = data => result = data.ToInt32(); +#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously + server.OnReceivedAsync = async data => result = data.ToInt32(); +#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously await transport.ConnectAsync(CancellationToken.None); await transport.WriteBytesAsync(5, new ArraySegment(testData.ToBytes()), CancellationToken.None); @@ -327,14 +330,16 @@ public async Task WriteAsyncShouldSendData() [Test] public async Task WriteAsyncShouldAllowMoreThanOneWrite() { - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) using (var transport = new SocketTransport(endpoint, TestConfig.Options.ConnectionConfiguration, TestConfig.Log)) { const int testData = 99; var results = new List(); - server.OnBytesReceived = data => results.AddRange(data.Array.Skip(data.Offset).Take(data.Count)); +#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously + server.OnReceivedAsync = async data => results.AddRange(data.Array.Skip(data.Offset).Take(data.Count)); +#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously await transport.ConnectAsync(CancellationToken.None); await Task.WhenAll( @@ -353,15 +358,17 @@ public async Task AsynchronousWriteAndReadShouldBeConsistent() var readOnServer = new ConcurrentBag(); var readOnClient = new ConcurrentBag(); - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) { using (var transport = new SocketTransport(endpoint, TestConfig.Options.ConnectionConfiguration, TestConfig.Log)) { - server.OnBytesReceived = data => { +#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously + server.OnReceivedAsync = async data => { var d = data.Batch(4).Select(x => x.ToArray().ToInt32()); foreach (var item in d) { readOnServer.Add(item); } }; +#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously await transport.ConnectAsync(CancellationToken.None); var clientWriteTasks = expected.Select(i => transport.WriteBytesAsync(i, new ArraySegment(i.ToBytes()), CancellationToken.None)); @@ -395,16 +402,19 @@ public async Task AsynchronousWriteAndReadShouldBeConsistent() public async Task WriteShouldHandleLargeVolumeSendAsynchronously([Values(1000, 5000)] int requests) { var readOnServer = new ConcurrentBag(); - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) { using (var transport = new SocketTransport(endpoint, TestConfig.Options.ConnectionConfiguration, TestConfig.Log)) { - server.OnBytesReceived = data => +#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously + server.OnReceivedAsync = async data => { var d = data.Batch(4).Select(x => x.ToArray().ToInt32()); foreach (var item in d) { readOnServer.Add(item); } }; +#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously + await transport.ConnectAsync(CancellationToken.None); var clientWriteTasks = Enumerable.Range(1, requests).Select(i => transport.WriteBytesAsync(i, new ArraySegment(i.ToBytes()), CancellationToken.None)); @@ -421,7 +431,7 @@ public async Task WriteShouldCancelWhileSendingData() { var clientWriteAttempts = 0; var config = new ConnectionConfiguration(onWritingBytes: (e, payload) => Interlocked.Increment(ref clientWriteAttempts)); - var endpoint = await Endpoint.ResolveAsync(TestConfig.ServerUri(), TestConfig.Log); + var endpoint = TestConfig.ServerEndpoint(); using (var server = new TcpServer(endpoint.Ip.Port, TestConfig.Log)) { using (var transport = new SocketTransport(endpoint, config, TestConfig.Log)) { using (var token = new CancellationTokenSource()) diff --git a/src/KafkaClient.Tests/project.json b/src/KafkaClient.Tests/project.json index d6d081dd..ebb243a6 100644 --- a/src/KafkaClient.Tests/project.json +++ b/src/KafkaClient.Tests/project.json @@ -12,7 +12,6 @@ "System.Net.Sockets": "4.3.0", "Nito.AsyncEx.Coordination": "1.0.2", - "Nito.AsyncEx.Context": "1.1.0", "NSubstitute": "2.0.0-rc", "NUnit": "3.6.0", "System.Threading.Tasks.Parallel": "4.3.0", diff --git a/src/KafkaClient.Tests/project.lock.json b/src/KafkaClient.Tests/project.lock.json index dc868a9c..7b8ddbf5 100644 --- a/src/KafkaClient.Tests/project.lock.json +++ b/src/KafkaClient.Tests/project.lock.json @@ -559,21 +559,6 @@ "lib/netstandard1.0/Newtonsoft.Json.dll": {} } }, - "Nito.AsyncEx.Context/1.1.0": { - "type": "package", - "dependencies": { - "Nito.AsyncEx.Tasks": "1.1.0", - "System.Collections.Concurrent": "4.0.12", - "System.Diagnostics.Debug": "4.0.11", - "System.Threading": "4.0.11" - }, - "compile": { - "lib/netstandard1.3/Nito.AsyncEx.Context.dll": {} - }, - "runtime": { - "lib/netstandard1.3/Nito.AsyncEx.Context.dll": {} - } - }, "Nito.AsyncEx.Coordination/1.0.2": { "type": "package", "dependencies": { @@ -590,7 +575,7 @@ "lib/netstandard1.3/Nito.AsyncEx.Coordination.dll": {} } }, - "Nito.AsyncEx.Tasks/1.1.0": { + "Nito.AsyncEx.Tasks/1.0.1": { "type": "package", "dependencies": { "Nito.Disposables": "1.0.0", @@ -3360,19 +3345,6 @@ "tools/install.ps1" ] }, - "Nito.AsyncEx.Context/1.1.0": { - "sha512": "YPEb4a5lHPmESSOopAqxqpMqUO/OeOAr141TZCIHjc47I/41AOPTfymBbF/YDKaZYlElzVvGHBCR1Z8J4sBMOA==", - "type": "package", - "path": "Nito.AsyncEx.Context/1.1.0", - "files": [ - "Nito.AsyncEx.Context.1.1.0.nupkg.sha512", - "Nito.AsyncEx.Context.nuspec", - "lib/net46/Nito.AsyncEx.Context.dll", - "lib/net46/Nito.AsyncEx.Context.xml", - "lib/netstandard1.3/Nito.AsyncEx.Context.dll", - "lib/netstandard1.3/Nito.AsyncEx.Context.xml" - ] - }, "Nito.AsyncEx.Coordination/1.0.2": { "sha512": "jEZHo/pFKD6kzjuxpAGcWWFT7J3UGft2NH6zZ7cBjO3k5s9SDoQhLJxSRDafx44gbao7PK8IQXxIOLBKIPPQTg==", "type": "package", @@ -3386,12 +3358,12 @@ "lib/netstandard1.3/Nito.AsyncEx.Coordination.xml" ] }, - "Nito.AsyncEx.Tasks/1.1.0": { - "sha512": "0HzeuwnELt9JMMm4KS7NLW0jcacj8gMuSKNPPkXfP+EXk17/K+WyAq2Bv79VrzD2kAyOLavd9MF8JbggloJZWw==", + "Nito.AsyncEx.Tasks/1.0.1": { + "sha512": "FtQi5Qj22kxHNQv8XPYXp2xc4PPTfyr5jGATgDqRml9BF8OEYwuntSRrbGbxlbWpp0ossrXyUq2gIV1CTQQB2w==", "type": "package", - "path": "Nito.AsyncEx.Tasks/1.1.0", + "path": "Nito.AsyncEx.Tasks/1.0.1", "files": [ - "Nito.AsyncEx.Tasks.1.1.0.nupkg.sha512", + "Nito.AsyncEx.Tasks.1.0.1.nupkg.sha512", "Nito.AsyncEx.Tasks.nuspec", "lib/net46/Nito.AsyncEx.Tasks.dll", "lib/net46/Nito.AsyncEx.Tasks.xml", @@ -8360,7 +8332,6 @@ "KafkaClient.Testing", "NSubstitute >= 2.0.0-rc", "NUnit >= 3.6.0", - "Nito.AsyncEx.Context >= 1.1.0", "Nito.AsyncEx.Coordination >= 1.0.2", "System.Collections.Immutable >= 1.3.1", "System.Net.Sockets >= 4.3.0", diff --git a/src/KafkaClient/Connections/Connection.cs b/src/KafkaClient/Connections/Connection.cs index 3ba626e7..8f762b05 100644 --- a/src/KafkaClient/Connections/Connection.cs +++ b/src/KafkaClient/Connections/Connection.cs @@ -155,7 +155,7 @@ private async Task DedicatedReceiveAsync() var header = new byte[KafkaEncoder.ResponseHeaderSize]; AsyncItem asyncItem = null; // use backoff so we don't take over the CPU when there's a failure - await Retry.Until(TimeSpan.FromSeconds(5), TimeSpan.FromMilliseconds(5)).TryAsync( + await Retry.WithBackoff(int.MaxValue, minimumDelay: TimeSpan.FromMilliseconds(5), maximumDelay: TimeSpan.FromSeconds(5)).TryAsync( async attempt => { await _transport.ConnectAsync(_disposeToken.Token).ConfigureAwait(false); @@ -227,12 +227,12 @@ private AsyncItem LookupByCorrelateId(int correlationId, int expectedBytes) return new AsyncItem(new RequestContext(correlationId), new UnknownRequest()); } - private const int OverflowGuard = int.MaxValue >> 1; + internal static int OverflowGuard = int.MaxValue >> 1; private int NextCorrelationId() { var id = Interlocked.Increment(ref _correlationIdSeed); - if (id > OverflowGuard) { - // to avoid overflow + if (id >= OverflowGuard) { + // to avoid integer overflow Interlocked.Exchange(ref _correlationIdSeed, 0); } return id; @@ -256,6 +256,7 @@ private void RemoveFromCorrelationMatching(AsyncItem asyncItem, Exception except if (_requestsByCorrelation.TryRemove(correlationId, out request)) { _log.Info(() => LogEvent.Create($"Removed request {request.ApiKey} (id {correlationId}): timed out or otherwise errored in client.")); if (_timedOutRequestsByCorrelation.Count > 100) { + _log.Debug(() => LogEvent.Create($"Clearing timed out requests to avoid overflow ({_timedOutRequestsByCorrelation.Count}).")); _timedOutRequestsByCorrelation.Clear(); } _timedOutRequestsByCorrelation.TryAdd(correlationId, request); @@ -341,11 +342,10 @@ public void ResponseCompleted(ILog log) } log.Info(() => LogEvent.Create($"Received {ApiKey} response (id {Context.CorrelationId}, {ResponseStream.Length + KafkaEncoder.CorrelationSize} bytes)")); if (!ReceiveTask.TrySetResult(bytes)) { - log.Debug( - () => { - var result = KafkaEncoder.Decode(Context, ApiKey, bytes); - return LogEvent.Create($"Timed out -----> (timed out or otherwise errored in client) {{Context:{Context},\n{ApiKey}Response:{result}}}"); - }); + log.Debug(() => { + var result = KafkaEncoder.Decode(Context, ApiKey, bytes); + return LogEvent.Create($"Timed out -----> (timed out or otherwise errored in client) {{Context:{Context},\n{ApiKey}Response:{result}}}"); + }); } } diff --git a/src/KafkaClient/Extensions.cs b/src/KafkaClient/Extensions.cs index 9d44dac5..e1a70b78 100644 --- a/src/KafkaClient/Extensions.cs +++ b/src/KafkaClient/Extensions.cs @@ -24,6 +24,12 @@ public static IVersionSupport Dynamic(this VersionSupport versionSupport) #region KafkaOptions + public static async Task CreateConnectionAsync(this KafkaOptions options) + { + var endpoint = await Endpoint.ResolveAsync(options.ServerUris.First(), options.Log); + return options.CreateConnection(endpoint); + } + public static IConnection CreateConnection(this KafkaOptions options, Endpoint endpoint) { return options.ConnectionFactory.Create(endpoint, options.ConnectionConfiguration, options.Log);