Skip to content

Commit

Permalink
fix tests with race conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
awr committed Feb 1, 2017
1 parent 615c139 commit 73d6f58
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 45 deletions.
11 changes: 5 additions & 6 deletions src/KafkaClient.Tests/FakeRouter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ public class FakeRouter
{
public const string TestTopic = "UnitTest";

public TimeSpan CacheExpiration = TimeSpan.FromMilliseconds(1);

private int _offset1;
public FakeConnection Connection1 { get; }

Expand Down Expand Up @@ -64,20 +62,21 @@ public FakeRouter()

var kafkaConnectionFactory = Substitute.For<IConnectionFactory>();
kafkaConnectionFactory
.Create(Arg.Is<Endpoint>(e => e.Ip.Port == 1), Arg.Any<IConnectionConfiguration>(),Arg.Any<ILog>())
.Create(Arg.Is<Endpoint>(e => e.Ip.Port == 1), Arg.Any<IConnectionConfiguration>(), Arg.Any<ILog>())
.Returns(Connection1);
kafkaConnectionFactory
.Create(Arg.Is<Endpoint>(e => e.Ip.Port == 2), Arg.Any<IConnectionConfiguration>(),Arg.Any<ILog>())
.Create(Arg.Is<Endpoint>(e => e.Ip.Port == 2), Arg.Any<IConnectionConfiguration>(), Arg.Any<ILog>())
.Returns(Connection2);
KafkaConnectionFactory = kafkaConnectionFactory;
}

public IRouter Create()
public IRouter Create(TimeSpan? cacheExpiration = null)
{
return new Router(
new [] { new Endpoint(new IPEndPoint(IPAddress.Loopback, 1)), new Endpoint(new IPEndPoint(IPAddress.Loopback, 2)) },
KafkaConnectionFactory,
routerConfiguration: new RouterConfiguration(cacheExpiration: CacheExpiration));
routerConfiguration: new RouterConfiguration(cacheExpiration: cacheExpiration.GetValueOrDefault(TimeSpan.FromMilliseconds(1))),
log: TestConfig.Log);
}

#pragma warning disable 1998
Expand Down
45 changes: 23 additions & 22 deletions src/KafkaClient.Tests/Unit/RouterSendAsyncTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ public class RouterSendAsyncTests
public async Task ShouldTryToRefreshMataDataIfCanRecoverByRefreshMetadata(ErrorCode code)
{
var routerProxy = new FakeRouter();
routerProxy.CacheExpiration = new TimeSpan(10);
var router = routerProxy.Create();
var cacheExpiration = new TimeSpan(10);
var router = routerProxy.Create(cacheExpiration);

routerProxy.Connection1.Add(ApiKey.Fetch, FailedInFirstMessageError(code, routerProxy.CacheExpiration));
routerProxy.Connection1.Add(ApiKey.Fetch, FailedInFirstMessageError(code, cacheExpiration));
routerProxy.Connection1.Add(ApiKey.Metadata, async _ => await FakeRouter.DefaultMetadataResponse());

await router.SendAsync(new FetchRequest(), FakeRouter.TestTopic, PartitionId, CancellationToken.None);
Expand All @@ -43,10 +43,10 @@ public async Task ShouldTryToRefreshMataDataIfCanRecoverByRefreshMetadata(ErrorC
public async Task ShouldTryToRefreshMataDataIfOnExceptions(Type exceptionType)
{
var routerProxy = new FakeRouter();
routerProxy.CacheExpiration = TimeSpan.FromMilliseconds(10);
var router = routerProxy.Create();
var cacheExpiration = TimeSpan.FromMilliseconds(10);
var router = routerProxy.Create(cacheExpiration);

routerProxy.Connection1.Add(ApiKey.Fetch, FailedInFirstMessageException(exceptionType, routerProxy.CacheExpiration));
routerProxy.Connection1.Add(ApiKey.Fetch, FailedInFirstMessageException(exceptionType, cacheExpiration));
routerProxy.Connection1.Add(ApiKey.Metadata, async _ => await FakeRouter.DefaultMetadataResponse());

await router.SendAsync(new FetchRequest(), FakeRouter.TestTopic, PartitionId, CancellationToken.None);
Expand All @@ -60,10 +60,10 @@ public async Task ShouldTryToRefreshMataDataIfOnExceptions(Type exceptionType)
public async Task SendProtocolRequestShouldThrowException(Type exceptionType)
{
var routerProxy = new FakeRouter();
routerProxy.CacheExpiration = TimeSpan.FromMilliseconds(10);
var router = routerProxy.Create();
var cacheExpiration = TimeSpan.FromMilliseconds(10);
var router = routerProxy.Create(cacheExpiration);

routerProxy.Connection1.Add(ApiKey.Fetch, FailedInFirstMessageException(exceptionType, routerProxy.CacheExpiration));
routerProxy.Connection1.Add(ApiKey.Fetch, FailedInFirstMessageException(exceptionType, cacheExpiration));
routerProxy.Connection1.Add(ApiKey.Metadata, async _ => await FakeRouter.DefaultMetadataResponse());
Assert.ThrowsAsync(exceptionType, async () => await router.SendAsync(new FetchRequest(), FakeRouter.TestTopic, PartitionId, CancellationToken.None));
}
Expand All @@ -80,10 +80,10 @@ public async Task SendProtocolRequestShouldNotTryToRefreshMataDataIfCanNotRecove
ErrorCode code)
{
var routerProxy = new FakeRouter();
routerProxy.CacheExpiration = TimeSpan.FromMilliseconds(10);
var router = routerProxy.Create();
var cacheExpiration = TimeSpan.FromMilliseconds(10);
var router = routerProxy.Create(cacheExpiration);

routerProxy.Connection1.Add(ApiKey.Fetch, FailedInFirstMessageError(code, routerProxy.CacheExpiration));
routerProxy.Connection1.Add(ApiKey.Fetch, FailedInFirstMessageError(code, cacheExpiration));
routerProxy.Connection1.Add(ApiKey.Metadata, async _ => await FakeRouter.DefaultMetadataResponse());
Assert.ThrowsAsync<RequestException>(async () => await router.SendAsync(new FetchRequest(), FakeRouter.TestTopic, PartitionId, CancellationToken.None));
}
Expand All @@ -92,8 +92,8 @@ public async Task SendProtocolRequestShouldNotTryToRefreshMataDataIfCanNotRecove
public async Task ShouldUpdateMetadataOnce()
{
var routerProxy = new FakeRouter();
routerProxy.CacheExpiration = TimeSpan.FromMilliseconds(10);
var router = routerProxy.Create();
var cacheExpiration = TimeSpan.FromMilliseconds(10);
var router = routerProxy.Create(cacheExpiration);

routerProxy.Connection1.Add(ApiKey.Fetch, ShouldReturnValidMessage);
routerProxy.Connection1.Add(ApiKey.Metadata, async _ => await FakeRouter.DefaultMetadataResponse());
Expand All @@ -103,7 +103,7 @@ public async Task ShouldUpdateMetadataOnce()
{
tasks[i] = router.SendAsync(new FetchRequest(), FakeRouter.TestTopic, PartitionId, CancellationToken.None);
}
await Task.Delay(routerProxy.CacheExpiration);
await Task.Delay(cacheExpiration);
await Task.Delay(1);
for (int i = 0; i < numberOfCall / 2; i++)
{
Expand All @@ -118,8 +118,9 @@ public async Task ShouldUpdateMetadataOnce()
[Test]
public async Task ShouldRecoverUpdateMetadataForNewTopic()
{
var routerProxy = new FakeRouter { CacheExpiration = TimeSpan.FromMilliseconds(100) };
var router = routerProxy.Create();
var routerProxy = new FakeRouter();
var cacheExpiration = TimeSpan.FromMilliseconds(100);
var router = routerProxy.Create(cacheExpiration);

var fetchRequest = new FetchRequest();

Expand Down Expand Up @@ -151,8 +152,8 @@ public async Task ShouldRecoverUpdateMetadataForNewTopic()
public async Task ShouldRecoverFromFailureByUpdateMetadataOnce() //Do not debug this test !!
{
var routerProxy = new FakeRouter();
routerProxy.CacheExpiration = TimeSpan.FromMilliseconds(1000);
var router = routerProxy.Create();
var cacheExpiration = TimeSpan.FromMilliseconds(1000);
var router = routerProxy.Create(cacheExpiration);

int partitionId = 0;
var fetchRequest = new FetchRequest();
Expand All @@ -168,7 +169,7 @@ public async Task ShouldRecoverFromFailureByUpdateMetadataOnce() //Do not debug
{
if (Interlocked.Increment(ref numberOfErrorSend) == numberOfCall)
{
await Task.Delay(routerProxy.CacheExpiration);
await Task.Delay(cacheExpiration);
await Task.Delay(1);
x.TrySetResult(1);
log.Debug(() => LogEvent.Create("all is complete "));
Expand Down Expand Up @@ -220,8 +221,8 @@ await ShouldRecoverByUpdateMetadataOnceFullScenario(
private async Task ShouldRecoverByUpdateMetadataOnceFullScenario(Func<IRequestContext, Task<IResponse>> fetchResponse)
{
var routerProxy = new FakeRouter();
routerProxy.CacheExpiration = TimeSpan.FromMilliseconds(0);
var router = routerProxy.Create();
var cacheExpiration = TimeSpan.Zero;
var router = routerProxy.Create(cacheExpiration);
int partitionId = 0;
var fetchRequest = new FetchRequest();

Expand Down
26 changes: 12 additions & 14 deletions src/KafkaClient.Tests/Unit/RouterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,11 @@ public async Task BrokerRouteShouldReturnGroupFromCache()
public async Task RefreshGroupMetadataShouldIgnoreCacheAndAlwaysCauseRequestAfterExpirationDate()
{
var routerProxy = new FakeRouter();
var router = routerProxy.Create();
TimeSpan cacheExpiration = TimeSpan.FromMilliseconds(100);
var cacheExpiration = TimeSpan.FromMilliseconds(100);
var router = routerProxy.Create(cacheExpiration);
await router.RefreshGroupBrokerAsync(TestTopic, true, CancellationToken.None);
Assert.That(routerProxy.Connection1[ApiKey.GroupCoordinator], Is.EqualTo(1));
await Task.Delay(routerProxy.CacheExpiration);
await Task.Delay(1);//After cache is expired
await Task.Delay(cacheExpiration.Add(TimeSpan.FromMilliseconds(1))); // After cache is expired
await router.RefreshGroupBrokerAsync(TestTopic, true, CancellationToken.None);
Assert.That(routerProxy.Connection1[ApiKey.GroupCoordinator], Is.EqualTo(2));
}
Expand All @@ -187,7 +186,7 @@ await Task.WhenAll(
public async Task SimultaneouslyGetGroupMetadataShouldGetDataFromCacheOnSameRequest()
{
var routerProxy = new FakeRouter();
var router = routerProxy.Create();
var router = routerProxy.Create(TimeSpan.FromMinutes(1)); // long timeout to avoid race condition on lock lasting longer than cache timeout

await Task.WhenAll(
router.GetGroupBrokerAsync(TestTopic, CancellationToken.None),
Expand Down Expand Up @@ -273,12 +272,11 @@ public async Task BrokerRouteShouldReturnAllTopicsFromCache()
public async Task RefreshTopicMetadataShouldIgnoreCacheAndAlwaysCauseMetadataRequestAfterExpirationDate()
{
var routerProxy = new FakeRouter();
var router = routerProxy.Create();
TimeSpan cacheExpiration = TimeSpan.FromMilliseconds(100);
var cacheExpiration = TimeSpan.FromMilliseconds(100);
var router = routerProxy.Create(cacheExpiration);
await router.RefreshTopicMetadataAsync(TestTopic, true, CancellationToken.None);
Assert.That(routerProxy.Connection1[ApiKey.Metadata], Is.EqualTo(1));
await Task.Delay(routerProxy.CacheExpiration);
await Task.Delay(1);//After cache is expired
await Task.Delay(cacheExpiration.Add(TimeSpan.FromMilliseconds(1))); // After cache is expired
await router.RefreshTopicMetadataAsync(TestTopic, true, CancellationToken.None);
Assert.That(routerProxy.Connection1[ApiKey.Metadata], Is.EqualTo(2));
}
Expand All @@ -299,16 +297,16 @@ public async Task SelectBrokerRouteShouldChange()
{
var routerProxy = new FakeRouter();

var router = routerProxy.Create();
var cacheExpiry = TimeSpan.FromMilliseconds(1);
var router = routerProxy.Create(cacheExpiry);

routerProxy.MetadataResponse = FakeRouter.DefaultMetadataResponse;
await router.RefreshTopicMetadataAsync(TestTopic, true, CancellationToken.None);

var router1 = router.GetTopicBroker(TestTopic, 0);

Assert.That(routerProxy.Connection1[ApiKey.Metadata], Is.EqualTo(1));
await Task.Delay(routerProxy.CacheExpiration);
await Task.Delay(1);//After cache is expired
await Task.Delay(cacheExpiry.Add(TimeSpan.FromMilliseconds(1))); // After cache is expired
routerProxy.MetadataResponse = FakeRouter.MetadataResponseWithSingleBroker;
await router.RefreshTopicMetadataAsync(TestTopic, true, CancellationToken.None);
var router2 = router.GetTopicBroker(TestTopic, 0);
Expand Down Expand Up @@ -336,12 +334,12 @@ await Task.WhenAll(
public async Task SimultaneouslyGetTopicMetadataShouldGetDataFromCacheOnSameRequest()
{
var routerProxy = new FakeRouter();
var router = routerProxy.Create();
var router = routerProxy.Create(TimeSpan.FromMinutes(1)); // long timeout to avoid race condition on lock lasting longer than cache timeout

await Task.WhenAll(
router.GetTopicMetadataAsync(TestTopic, CancellationToken.None),
router.GetTopicMetadataAsync(TestTopic, CancellationToken.None)
); //do not debug
);
Assert.That(routerProxy.Connection1[ApiKey.Metadata], Is.EqualTo(1));
}

Expand Down
3 changes: 2 additions & 1 deletion src/KafkaClient/Router.cs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ private void UpdateGroupBrokerCache(GroupCoordinatorRequest request, GroupCoordi
if (request == null || response == null) return;

_groupBrokerCache = _groupBrokerCache.SetItem(request.GroupId, new Tuple<int, DateTimeOffset>(response.BrokerId, DateTimeOffset.UtcNow));
Log.Verbose(() => LogEvent.Create($"Router set broker for group {request.GroupId} to {response.BrokerId}"));
}

#endregion
Expand Down Expand Up @@ -533,7 +534,7 @@ private IImmutableDictionary<string, IMemberAssignment> TryGetCachedMemberAssign

private static bool HasExpired<T>(Tuple<T, DateTimeOffset> cachedValue, TimeSpan? expiration = null)
{
return expiration.HasValue && expiration.Value < DateTimeOffset.UtcNow - cachedValue.Item2;
return expiration.HasValue && cachedValue.Item2.Add(expiration.Value) < DateTimeOffset.UtcNow;
}

private class CachedResults<T>
Expand Down
4 changes: 2 additions & 2 deletions src/KafkaClient/RouterConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public class RouterConfiguration : IRouterConfiguration
public RouterConfiguration(IRetry refreshRetry = null, TimeSpan? cacheExpiration = null, IRetry sendRetry = null)
{
RefreshRetry = refreshRetry ?? Defaults.RefreshRetry();
CacheExpiration = cacheExpiration ?? TimeSpan.FromMilliseconds(Defaults.CacheExpirationMilliseconds);
CacheExpiration = cacheExpiration ?? TimeSpan.FromSeconds(Defaults.CacheExpirationSeconds);
SendRetry = sendRetry ?? Retry.AtMost(Defaults.MaxSendRetryAttempts);
}

Expand Down Expand Up @@ -41,7 +41,7 @@ public static class Defaults
/// <summary>
/// The default expiration length for <see cref="CacheExpiration"/>
/// </summary>
public const int CacheExpirationMilliseconds = 1000;
public const int CacheExpirationSeconds = 1;

/// <summary>
/// The default attempts for <see cref="SendRetry"/>
Expand Down

0 comments on commit 73d6f58

Please sign in to comment.