Skip to content

Commit

Permalink
Merge pull request #641 from mattrjacobs/deflake-sempahore-count-tests
Browse files Browse the repository at this point in the history
Fix flakiness of testSemaphorePermitsInUse
  • Loading branch information
mattrjacobs committed Feb 5, 2015
2 parents 05387c2 + 7970331 commit 2e08019
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2170,23 +2170,27 @@ public void testSemaphorePermitsInUse() {
new TryableSemaphoreActual(HystrixProperty.Factory.asProperty(3));

// used to wait until all commands have started
final CountDownLatch startLatch = new CountDownLatch(sharedSemaphore.numberOfPermits.get() + 1);
final CountDownLatch startLatch = new CountDownLatch((sharedSemaphore.numberOfPermits.get() * 2) + 1);

// used to signal that all command can finish
final CountDownLatch sharedLatch = new CountDownLatch(1);

// tracks failures to obtain semaphores
final AtomicInteger failureCount = new AtomicInteger();

final Runnable sharedSemaphoreRunnable = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {
public void run() {
try {
new LatchedSemaphoreCommand(circuitBreaker, sharedSemaphore, startLatch, sharedLatch).execute();
} catch (Exception e) {
startLatch.countDown();
e.printStackTrace();
failureCount.incrementAndGet();
}
}
});

// creates group of threads each using command sharing a single semaphore

// I create extra threads and commands so that I can verify that some of them fail to obtain a semaphore
final int sharedThreadCount = sharedSemaphore.numberOfPermits.get() * 2;
final Thread[] sharedSemaphoreThreads = new Thread[sharedThreadCount];
Expand All @@ -2200,23 +2204,21 @@ public void run() {

final CountDownLatch isolatedLatch = new CountDownLatch(1);

// tracks failures to obtain semaphores
final AtomicInteger failureCount = new AtomicInteger();

final Thread isolatedThread = new Thread(new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {
public void run() {
try {
new LatchedSemaphoreCommand(circuitBreaker, isolatedSemaphore, startLatch, isolatedLatch).execute();
} catch (Exception e) {
startLatch.countDown();
e.printStackTrace();
failureCount.incrementAndGet();
}
}
}));

// verifies no permits in use before starting threads
assertEquals("wrong number of permits for shared semaphore", 0, sharedSemaphore.getNumberOfPermitsUsed());
assertEquals("wrong number of permits for isolated semaphore", 0, isolatedSemaphore.getNumberOfPermitsUsed());
assertEquals("before threads start, shared semaphore should be unused", 0, sharedSemaphore.getNumberOfPermitsUsed());
assertEquals("before threads start, isolated semaphore should be unused", 0, isolatedSemaphore.getNumberOfPermitsUsed());

for (int i = 0; i < sharedThreadCount; i++) {
sharedSemaphoreThreads[i].start();
Expand All @@ -2231,9 +2233,9 @@ public void run() {
}

// verifies that all semaphores are in use
assertEquals("wrong number of permits for shared semaphore",
assertEquals("immediately after command start, all shared semaphores should be in-use",
sharedSemaphore.numberOfPermits.get().longValue(), sharedSemaphore.getNumberOfPermitsUsed());
assertEquals("wrong number of permits for isolated semaphore",
assertEquals("immediately after command start, isolated semaphore should be in-use",
isolatedSemaphore.numberOfPermits.get().longValue(), isolatedSemaphore.getNumberOfPermitsUsed());

// signals commands to finish
Expand All @@ -2251,12 +2253,11 @@ public void run() {
}

// verifies no permits in use after finishing threads
assertEquals("wrong number of permits for shared semaphore", 0, sharedSemaphore.getNumberOfPermitsUsed());
assertEquals("wrong number of permits for isolated semaphore", 0, isolatedSemaphore.getNumberOfPermitsUsed());
assertEquals("after all threads have finished, no shared semaphores should be in-use", 0, sharedSemaphore.getNumberOfPermitsUsed());
assertEquals("after all threads have finished, isolated semaphore not in-use", 0, isolatedSemaphore.getNumberOfPermitsUsed());

// verifies that some executions failed
final int expectedFailures = sharedSemaphore.getNumberOfPermitsUsed();
assertEquals("failures expected but did not happen", expectedFailures, failureCount.get());
assertEquals("expected some of shared semaphore commands to get rejected", sharedSemaphore.numberOfPermits.get().longValue(), failureCount.get());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1588,17 +1588,22 @@ public void testSemaphorePermitsInUse() {
new TryableSemaphoreActual(HystrixProperty.Factory.asProperty(3));

// used to wait until all commands have started
final CountDownLatch startLatch = new CountDownLatch(sharedSemaphore.numberOfPermits.get() + 1);
final CountDownLatch startLatch = new CountDownLatch((sharedSemaphore.numberOfPermits.get()) * 2 + 1);

// used to signal that all command can finish
final CountDownLatch sharedLatch = new CountDownLatch(1);

// tracks failures to obtain semaphores
final AtomicInteger failureCount = new AtomicInteger();

final Runnable sharedSemaphoreRunnable = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {
public void run() {
try {
new LatchedSemaphoreCommand(circuitBreaker, sharedSemaphore, startLatch, sharedLatch).observe().toBlocking().single();
} catch (Exception e) {
startLatch.countDown();
e.printStackTrace();
failureCount.incrementAndGet();
}
}
});
Expand All @@ -1618,23 +1623,21 @@ public void run() {

final CountDownLatch isolatedLatch = new CountDownLatch(1);

// tracks failures to obtain semaphores
final AtomicInteger failureCount = new AtomicInteger();

final Thread isolatedThread = new Thread(new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {
public void run() {
try {
new LatchedSemaphoreCommand(circuitBreaker, isolatedSemaphore, startLatch, isolatedLatch).observe().toBlocking().single();
} catch (Exception e) {
startLatch.countDown();
e.printStackTrace();
failureCount.incrementAndGet();
}
}
}));

// verifies no permits in use before starting threads
assertEquals("wrong number of permits for shared semaphore", 0, sharedSemaphore.getNumberOfPermitsUsed());
assertEquals("wrong number of permits for isolated semaphore", 0, isolatedSemaphore.getNumberOfPermitsUsed());
assertEquals("before threads start, shared semaphore should be unused", 0, sharedSemaphore.getNumberOfPermitsUsed());
assertEquals("before threads start, isolated semaphore should be unused", 0, isolatedSemaphore.getNumberOfPermitsUsed());

for (int i = 0; i < sharedThreadCount; i++) {
sharedSemaphoreThreads[i].start();
Expand All @@ -1649,9 +1652,9 @@ public void run() {
}

// verifies that all semaphores are in use
assertEquals("wrong number of permits for shared semaphore",
assertEquals("immediately after command start, all shared semaphores should be in-use",
sharedSemaphore.numberOfPermits.get().longValue(), sharedSemaphore.getNumberOfPermitsUsed());
assertEquals("wrong number of permits for isolated semaphore",
assertEquals("immediately after command start, isolated semaphore should be in-use",
isolatedSemaphore.numberOfPermits.get().longValue(), isolatedSemaphore.getNumberOfPermitsUsed());

// signals commands to finish
Expand All @@ -1669,12 +1672,11 @@ public void run() {
}

// verifies no permits in use after finishing threads
assertEquals("wrong number of permits for shared semaphore", 0, sharedSemaphore.getNumberOfPermitsUsed());
assertEquals("wrong number of permits for isolated semaphore", 0, isolatedSemaphore.getNumberOfPermitsUsed());
assertEquals("after all threads have finished, no shared semaphores should be in-use", 0, sharedSemaphore.getNumberOfPermitsUsed());
assertEquals("after all threads have finished, isolated semaphore not in-use", 0, isolatedSemaphore.getNumberOfPermitsUsed());

// verifies that some executions failed
final int expectedFailures = sharedSemaphore.getNumberOfPermitsUsed();
assertEquals("failures expected but did not happen", expectedFailures, failureCount.get());
assertEquals("expected some of shared semaphore commands to get rejected", sharedSemaphore.numberOfPermits.get().longValue(), failureCount.get());
}

/**
Expand Down

0 comments on commit 2e08019

Please sign in to comment.