diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServlet.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServlet.java index 236a67b1f..df146e987 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServlet.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServlet.java @@ -18,8 +18,8 @@ import com.netflix.config.DynamicIntProperty; import com.netflix.config.DynamicPropertyFactory; import com.netflix.hystrix.config.HystrixConfiguration; +import com.netflix.hystrix.config.HystrixConfigurationStream; import rx.Observable; -import rx.functions.Func1; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; @@ -49,25 +49,16 @@ public class HystrixConfigSseServlet extends HystrixSampleSseServlet> createStream) { - this.jsonStream = new HystrixConfigurationJsonStream(createStream); + super(HystrixConfigurationStream.getInstance().observe()); } - @Override - int getDefaultDelayInMilliseconds() { - return DEFAULT_ONNEXT_DELAY_IN_MS; + /* package-private */ HystrixConfigSseServlet(Observable sampleStream, int pausePollerThreadDelayInMs) { + super(sampleStream, pausePollerThreadDelayInMs); } @Override @@ -90,11 +81,6 @@ protected void decrementCurrentConcurrentConnections() { concurrentConnections.decrementAndGet(); } - @Override - protected Observable getStream(int delay) { - return jsonStream.observe(delay); - } - @Override protected String convertToString(HystrixConfiguration config) throws IOException { return HystrixConfigurationJsonStream.convertToString(config); diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigurationJsonStream.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigurationJsonStream.java index 1a2aab252..12f5f3d42 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigurationJsonStream.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigurationJsonStream.java @@ -46,15 +46,17 @@ public class HystrixConfigurationJsonStream { private static final JsonFactory jsonFactory = new JsonFactory(); private final Func1> streamGenerator; + @Deprecated //since 1.5.4 public HystrixConfigurationJsonStream() { this.streamGenerator = new Func1>() { @Override public Observable call(Integer delay) { - return new HystrixConfigurationStream(delay).observe(); + return HystrixConfigurationStream.getInstance().observe(); } }; } + @Deprecated //since 1.5.4 public HystrixConfigurationJsonStream(Func1> streamGenerator) { this.streamGenerator = streamGenerator; } @@ -171,10 +173,25 @@ public static String convertToString(HystrixConfiguration config) throws IOExcep return jsonString.getBuffer().toString(); } + /** + * @deprecated Not for public use. Using the delay param prevents streams from being efficiently shared. + * Please use {@link HystrixConfigurationStream#observe()} + * @param delay interval between data emissions + * @return sampled utilization as Java object, taken on a timer + */ + @Deprecated //deprecated in 1.5.4 public Observable observe(int delay) { return streamGenerator.call(delay); } + /** + * @deprecated Not for public use. Using the delay param prevents streams from being efficiently shared. + * Please use {@link HystrixConfigurationStream#observe()} + * and you can map to JSON string via {@link HystrixConfigurationJsonStream#convertToString(HystrixConfiguration)} + * @param delay interval between data emissions + * @return sampled utilization as JSON string, taken on a timer + */ + @Deprecated //deprecated in 1.5.4 public Observable observeJson(int delay) { return streamGenerator.call(delay).map(convertToJson); } diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixSampleSseServlet.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixSampleSseServlet.java index 1353244bb..2882c0881 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixSampleSseServlet.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixSampleSseServlet.java @@ -33,15 +33,27 @@ /** */ public abstract class HystrixSampleSseServlet extends HttpServlet { + protected final Observable sampleStream; private static final Logger logger = LoggerFactory.getLogger(HystrixSampleSseServlet.class); + //wake up occasionally and check that poller is still alive. this value controls how often + private static final int DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS = 500; + + private final int pausePollerThreadDelayInMs; + /* Set to true upon shutdown, so it's OK to be shared among all SampleSseServlets */ private static volatile boolean isDestroyed = false; - private static final String DELAY_REQ_PARAM_NAME = "delay"; + protected HystrixSampleSseServlet(Observable sampleStream) { + this.sampleStream = sampleStream; + this.pausePollerThreadDelayInMs = DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS; + } - abstract int getDefaultDelayInMilliseconds(); + protected HystrixSampleSseServlet(Observable sampleStream, int pausePollerThreadDelayInMs) { + this.sampleStream = sampleStream; + this.pausePollerThreadDelayInMs = pausePollerThreadDelayInMs; + } abstract int getMaxNumberConcurrentConnectionsAllowed(); @@ -51,7 +63,7 @@ public abstract class HystrixSampleSseServlet extends HttpServlet { protected abstract void decrementCurrentConcurrentConnections(); - protected abstract Observable getStream(int delay); + //protected abstract Observable getStream(); protected abstract String convertToString(SampleData sampleData) throws IOException; @@ -67,19 +79,6 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) t } } - /* package-private */ - int getDelayFromHttpRequest(HttpServletRequest req) { - try { - String delay = req.getParameter(DELAY_REQ_PARAM_NAME); - if (delay != null) { - return Math.max(Integer.parseInt(delay), 1); - } - } catch (Throwable ex) { - //silently fail - } - return getDefaultDelayInMilliseconds(); - } - /** * WebSphere won't shutdown a servlet until after a 60 second timeout if there is an instance of the servlet executing * a request. Add this method to enable a hook to notify Hystrix to shutdown. You must invoke this method at @@ -125,8 +124,6 @@ private void handleRequest(HttpServletRequest request, final HttpServletResponse if (numberConnections > maxNumberConnectionsAllowed) { response.sendError(503, "MaxConcurrentConnections reached: " + maxNumberConnectionsAllowed); } else { - int delay = getDelayFromHttpRequest(request); - /* initialize response */ response.setHeader("Content-Type", "text/event-stream;charset=UTF-8"); response.setHeader("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate"); @@ -134,11 +131,11 @@ private void handleRequest(HttpServletRequest request, final HttpServletResponse final PrintWriter writer = response.getWriter(); - Observable sampledStream = getStream(delay); + //Observable sampledStream = getStream(); //since the sample stream is based on Observable.interval, events will get published on an RxComputation thread //since writing to the servlet response is blocking, use the Rx IO thread for the write that occurs in the onNext - sampleSubscription = sampledStream + sampleSubscription = sampleStream .observeOn(Schedulers.io()) .subscribe(new Subscriber() { @Override @@ -180,7 +177,7 @@ public void onNext(SampleData sampleData) { while (moreDataWillBeSent.get() && !isDestroyed) { try { - Thread.sleep(delay); + Thread.sleep(pausePollerThreadDelayInMs); } catch (InterruptedException e) { moreDataWillBeSent.set(false); } diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationJsonStream.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationJsonStream.java index dd17f41d2..c7c8dfafe 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationJsonStream.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationJsonStream.java @@ -55,15 +55,17 @@ public String call(HystrixUtilization utilization) { } }; + @Deprecated //since 1.5.4 public HystrixUtilizationJsonStream() { this.streamGenerator = new Func1>() { @Override public Observable call(Integer delay) { - return new HystrixUtilizationStream(delay).observe(); + return HystrixUtilizationStream.getInstance().observe(); } }; } + @Deprecated //since 1.5.4 public HystrixUtilizationJsonStream(Func1> streamGenerator) { this.streamGenerator = streamGenerator; } @@ -111,10 +113,24 @@ protected static String convertToJson(HystrixUtilization utilization) throws IOE return jsonString.getBuffer().toString(); } + /** + * @deprecated Not for public use. Using the delay param prevents streams from being efficiently shared. + * Please use {@link HystrixUtilizationStream#observe()} + * @param delay interval between data emissions + * @return sampled utilization as Java object, taken on a timer + */ + @Deprecated //deprecated as of 1.5.4 public Observable observe(int delay) { return streamGenerator.call(delay); } + /** + * @deprecated Not for public use. Using the delay param prevents streams from being efficiently shared. + * Please use {@link HystrixUtilizationStream#observe()} + * and the {@link #convertToJson(HystrixUtilization)} method + * @param delay interval between data emissions + * @return sampled utilization as JSON string, taken on a timer + */ public Observable observeJson(int delay) { return streamGenerator.call(delay).map(convertToJsonFunc); } diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationSseServlet.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationSseServlet.java index 22dee84d8..19c6a85df 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationSseServlet.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationSseServlet.java @@ -18,8 +18,8 @@ import com.netflix.config.DynamicIntProperty; import com.netflix.config.DynamicPropertyFactory; import com.netflix.hystrix.metric.sample.HystrixUtilization; +import com.netflix.hystrix.metric.sample.HystrixUtilizationStream; import rx.Observable; -import rx.functions.Func1; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; @@ -49,27 +49,17 @@ public class HystrixUtilizationSseServlet extends HystrixSampleSseServlet> createStream) { - this.jsonStream = new HystrixUtilizationJsonStream(createStream); - } - - @Override - int getDefaultDelayInMilliseconds() { - return DEFAULT_ONNEXT_DELAY_IN_MS; + /* package-private */ HystrixUtilizationSseServlet(Observable sampleStream, int pausePollerThreadDelayInMs) { + super(sampleStream, pausePollerThreadDelayInMs); } @Override @@ -92,11 +82,6 @@ protected void decrementCurrentConcurrentConnections() { concurrentConnections.decrementAndGet(); } - @Override - protected Observable getStream(int delay) { - return jsonStream.observe(delay); - } - @Override protected String convertToString(HystrixUtilization utilization) throws IOException { return HystrixUtilizationJsonStream.convertToJson(utilization); diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/test/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServletTest.java b/hystrix-contrib/hystrix-metrics-event-stream/src/test/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServletTest.java index 25effdc77..dfc868b4c 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/test/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServletTest.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/test/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServletTest.java @@ -16,7 +16,6 @@ package com.netflix.hystrix.contrib.sample.stream; import com.netflix.hystrix.config.HystrixConfiguration; -import com.netflix.hystrix.config.HystrixConfigurationStream; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -58,15 +57,6 @@ public HystrixConfiguration call(Long timestamp) { } }); - private Func1> generateStream(final Observable o) { - return new Func1>() { - @Override - public Observable call(Integer integer) { - return o; - } - }; - } - private final Observable streamOfOnNextThenOnError = Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { @@ -109,7 +99,7 @@ public void tearDown() { @Test public void shutdownServletShouldRejectRequests() throws ServletException, IOException { - servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNexts)); + servlet = new HystrixConfigSseServlet(streamOfOnNexts, 10); try { servlet.init(); } catch (ServletException ex) { @@ -126,7 +116,7 @@ public void shutdownServletShouldRejectRequests() throws ServletException, IOExc @Test public void testConfigDataWithInfiniteOnNextStream() throws IOException, InterruptedException { - servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNexts)); + servlet = new HystrixConfigSseServlet(streamOfOnNexts, 10); try { servlet.init(); } catch (ServletException ex) { @@ -182,13 +172,13 @@ public void run() { Thread.sleep(100); System.out.println("WRITES : " + writes.get()); - assertEquals(9, writes.get()); + assertTrue(writes.get() >= 9); assertEquals(0, servlet.getNumberCurrentConnections()); } @Test public void testConfigDataWithStreamOnError() throws IOException, InterruptedException { - servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNextThenOnError)); + servlet = new HystrixConfigSseServlet(streamOfOnNextThenOnError, 10); try { servlet.init(); } catch (ServletException ex) { @@ -241,7 +231,7 @@ public void run() { @Test public void testConfigDataWithStreamOnCompleted() throws IOException, InterruptedException { - servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNextThenOnCompleted)); + servlet = new HystrixConfigSseServlet(streamOfOnNextThenOnCompleted, 10); try { servlet.init(); } catch (ServletException ex) { @@ -294,7 +284,7 @@ public void run() { @Test public void testConfigDataWithIoExceptionOnWrite() throws IOException, InterruptedException { - servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNexts)); + servlet = new HystrixConfigSseServlet(streamOfOnNexts, 10); try { servlet.init(); } catch (ServletException ex) { @@ -303,7 +293,6 @@ public void testConfigDataWithIoExceptionOnWrite() throws IOException, Interrupt final AtomicInteger writes = new AtomicInteger(0); - when(mockReq.getParameter("delay")).thenReturn("100"); when(mockResp.getWriter()).thenReturn(mockPrintWriter); Mockito.doAnswer(new Answer() { @Override diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStream.java index 47badcae2..627be3616 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStream.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStream.java @@ -32,10 +32,6 @@ import java.util.function.Supplier; class EventStream implements Supplier> { - - private final static int CONFIGURATION_DATA_INTERVAL_IN_MS = 500; - private final static int UTILIZATION_DATA_INTERVAL_IN_MS = 500; - private final Observable source; private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false); @@ -57,7 +53,7 @@ public static EventStream getInstance(EventStreamEnum eventStreamEnum) { switch (eventStreamEnum) { case CONFIG_STREAM: - source = new HystrixConfigurationStream(CONFIGURATION_DATA_INTERVAL_IN_MS) + source = HystrixConfigurationStream.getInstance() .observe() .map(SerialHystrixConfiguration::toBytes) .map(SerialHystrixMetric::toPayload); @@ -69,7 +65,7 @@ public static EventStream getInstance(EventStreamEnum eventStreamEnum) { .map(SerialHystrixMetric::toPayload); break; case UTILIZATION_STREAM: - source = new HystrixUtilizationStream(UTILIZATION_DATA_INTERVAL_IN_MS) + source = HystrixUtilizationStream.getInstance() .observe() .map(SerialHystrixUtilization::toBytes) .map(SerialHystrixMetric::toPayload); diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamTest.java index 9a4d013b5..8057198d0 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamTest.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamTest.java @@ -81,6 +81,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { + e.printStackTrace(); System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Config OnError : " + e); latch.countDown(); } diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixConfigurationStream.java b/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixConfigurationStream.java index 290497690..18eb224d5 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixConfigurationStream.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixConfigurationStream.java @@ -26,12 +26,13 @@ import com.netflix.hystrix.HystrixThreadPoolMetrics; import com.netflix.hystrix.HystrixThreadPoolProperties; import rx.Observable; -import rx.functions.Func0; +import rx.functions.Action0; import rx.functions.Func1; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * This class samples current Hystrix configuration and exposes that as a stream @@ -39,38 +40,83 @@ public class HystrixConfigurationStream { private final int intervalInMilliseconds; - private final Observable timer; + private final Observable allConfigurationStream; + private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false); + private static final Func1 getAllConfig = + new Func1() { + @Override + public HystrixConfiguration call(Long timestamp) { + return HystrixConfiguration.from( + getAllCommandConfig.call(timestamp), + getAllThreadPoolConfig.call(timestamp), + getAllCollapserConfig.call(timestamp) + ); + } + }; + + /** + * @deprecated Not for public use. Please use {@link #getInstance()}. This facilitates better stream-sharing + * @param intervalInMilliseconds milliseconds between data emissions + */ + @Deprecated //deprecated in 1.5.4. public HystrixConfigurationStream(final int intervalInMilliseconds) { this.intervalInMilliseconds = intervalInMilliseconds; - this.timer = Observable.defer(new Func0>() { - @Override - public Observable call() { - return Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS); - } - }); + this.allConfigurationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS) + .map(getAllConfig) + .doOnSubscribe(new Action0() { + @Override + public void call() { + isSourceCurrentlySubscribed.set(true); + } + }) + .doOnUnsubscribe(new Action0() { + @Override + public void call() { + isSourceCurrentlySubscribed.set(false); + } + }) + .share() + .onBackpressureDrop(); + } + + private static final HystrixConfigurationStream INSTANCE = new HystrixConfigurationStream(500); + + public static HystrixConfigurationStream getInstance() { + return INSTANCE; } + static HystrixConfigurationStream getNonSingletonInstanceOnlyUsedInUnitTests(int delayInMs) { + return new HystrixConfigurationStream(delayInMs); + } + + /** + * Return a ref-counted stream that will only do work when at least one subscriber is present + */ public Observable observe() { - return timer.map(getAllConfig); + return allConfigurationStream; } public Observable> observeCommandConfiguration() { - return timer.map(getAllCommandConfig); + return allConfigurationStream.map(getOnlyCommandConfig); } public Observable> observeThreadPoolConfiguration() { - return timer.map(getAllThreadPoolConfig); + return allConfigurationStream.map(getOnlyThreadPoolConfig); } public Observable> observeCollapserConfiguration() { - return timer.map(getAllCollapserConfig); + return allConfigurationStream.map(getOnlyCollapserConfig); } public int getIntervalInMilliseconds() { return this.intervalInMilliseconds; } + public boolean isSourceCurrentlySubscribed() { + return isSourceCurrentlySubscribed.get(); + } + private static HystrixCommandConfiguration sampleCommandConfiguration(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, HystrixCommandProperties commandProperties) { return HystrixCommandConfiguration.sample(commandKey, threadPoolKey, groupKey, commandProperties); @@ -125,15 +171,29 @@ public Map call(Long timesta } }; - private static final Func1 getAllConfig = - new Func1() { + + + private static final Func1> getOnlyCommandConfig = + new Func1>() { @Override - public HystrixConfiguration call(Long timestamp) { - return HystrixConfiguration.from( - getAllCommandConfig.call(timestamp), - getAllThreadPoolConfig.call(timestamp), - getAllCollapserConfig.call(timestamp) - ); + public Map call(HystrixConfiguration hystrixConfiguration) { + return hystrixConfiguration.getCommandConfig(); + } + }; + + private static final Func1> getOnlyThreadPoolConfig = + new Func1>() { + @Override + public Map call(HystrixConfiguration hystrixConfiguration) { + return hystrixConfiguration.getThreadPoolConfig(); + } + }; + + private static final Func1> getOnlyCollapserConfig = + new Func1>() { + @Override + public Map call(HystrixConfiguration hystrixConfiguration) { + return hystrixConfiguration.getCollapserConfig(); } }; } diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixUtilizationStream.java b/hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixUtilizationStream.java index b6fea2487..aaff7dcdc 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixUtilizationStream.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixUtilizationStream.java @@ -20,47 +20,91 @@ import com.netflix.hystrix.HystrixThreadPoolKey; import com.netflix.hystrix.HystrixThreadPoolMetrics; import rx.Observable; -import rx.functions.Func0; +import rx.functions.Action0; import rx.functions.Func1; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * This class samples current Hystrix utilization of resources and exposes that as a stream */ public class HystrixUtilizationStream { - private final int intervalInMilliseconds; - private final Observable timer; + private final Observable allUtilizationStream; + private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false); + + private static final Func1 getAllUtilization = + new Func1() { + @Override + public HystrixUtilization call(Long timestamp) { + return HystrixUtilization.from( + getAllCommandUtilization.call(timestamp), + getAllThreadPoolUtilization.call(timestamp) + ); + } + }; + /** + * @deprecated Not for public use. Please use {@link #getInstance()}. This facilitates better stream-sharing + * @param intervalInMilliseconds milliseconds between data emissions + */ + @Deprecated //deprecated in 1.5.4. public HystrixUtilizationStream(final int intervalInMilliseconds) { this.intervalInMilliseconds = intervalInMilliseconds; - this.timer = Observable.defer(new Func0>() { - @Override - public Observable call() { - return Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS); - } - }); + this.allUtilizationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS) + .map(getAllUtilization) + .doOnSubscribe(new Action0() { + @Override + public void call() { + isSourceCurrentlySubscribed.set(true); + } + }) + .doOnUnsubscribe(new Action0() { + @Override + public void call() { + isSourceCurrentlySubscribed.set(false); + } + }) + .share() + .onBackpressureDrop(); + } + + private static final HystrixUtilizationStream INSTANCE = new HystrixUtilizationStream(500); + + public static HystrixUtilizationStream getInstance() { + return INSTANCE; + } + + static HystrixUtilizationStream getNonSingletonInstanceOnlyUsedInUnitTests(int delayInMs) { + return new HystrixUtilizationStream(delayInMs); } + /** + * Return a ref-counted stream that will only do work when at least one subscriber is present + */ public Observable observe() { - return timer.map(getAllUtilization); + return allUtilizationStream; } public Observable> observeCommandUtilization() { - return timer.map(getAllCommandUtilization); + return allUtilizationStream.map(getOnlyCommandUtilization); } public Observable> observeThreadPoolUtilization() { - return timer.map(getAllThreadPoolUtilization); + return allUtilizationStream.map(getOnlyThreadPoolUtilization); } public int getIntervalInMilliseconds() { return this.intervalInMilliseconds; } + public boolean isSourceCurrentlySubscribed() { + return isSourceCurrentlySubscribed.get(); + } + private static HystrixCommandUtilization sampleCommandUtilization(HystrixCommandMetrics commandMetrics) { return HystrixCommandUtilization.sample(commandMetrics); } @@ -95,14 +139,19 @@ public Map call(Long timesta } }; - private static final Func1 getAllUtilization = - new Func1() { + private static final Func1> getOnlyCommandUtilization = + new Func1>() { @Override - public HystrixUtilization call(Long timestamp) { - return HystrixUtilization.from( - getAllCommandUtilization.call(timestamp), - getAllThreadPoolUtilization.call(timestamp) - ); + public Map call(HystrixUtilization hystrixUtilization) { + return hystrixUtilization.getCommandUtilizationMap(); + } + }; + + private static final Func1> getOnlyThreadPoolUtilization = + new Func1>() { + @Override + public Map call(HystrixUtilization hystrixUtilization) { + return hystrixUtilization.getThreadPoolUtilizationMap(); } }; } diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/config/HystrixConfigurationStreamTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/config/HystrixConfigurationStreamTest.java new file mode 100644 index 000000000..c3ea1d8ea --- /dev/null +++ b/hystrix-core/src/test/java/com/netflix/hystrix/config/HystrixConfigurationStreamTest.java @@ -0,0 +1,326 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.config; + +import com.hystrix.junit.HystrixRequestContextRule; +import com.netflix.hystrix.HystrixCommand; +import com.netflix.hystrix.HystrixCommandGroupKey; +import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixEventType; +import com.netflix.hystrix.config.HystrixConfiguration; +import com.netflix.hystrix.config.HystrixConfigurationStream; +import com.netflix.hystrix.metric.CommandStreamTest; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import rx.Observable; +import rx.Subscriber; +import rx.Subscription; +import rx.functions.Action0; +import rx.functions.Func1; +import rx.functions.Func2; +import rx.schedulers.Schedulers; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class HystrixConfigurationStreamTest extends CommandStreamTest { + + @Rule + public HystrixRequestContextRule ctx = new HystrixRequestContextRule(); + + HystrixConfigurationStream stream; + private final static HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("Config"); + private final static HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("Command"); + + @Before + public void init() { + stream = HystrixConfigurationStream.getNonSingletonInstanceOnlyUsedInUnitTests(10); + } + + @Test + public void testStreamHasData() throws Exception { + final AtomicBoolean commandShowsUp = new AtomicBoolean(false); + final AtomicBoolean threadPoolShowsUp = new AtomicBoolean(false); + final CountDownLatch latch = new CountDownLatch(1); + final int NUM = 10; + + for (int i = 0; i < 2; i++) { + HystrixCommand cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50); + cmd.observe(); + } + + stream.observe().take(NUM).subscribe( + new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnCompleted"); + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnError : " + e); + latch.countDown(); + } + + @Override + public void onNext(HystrixConfiguration configuration) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Received data with : " + configuration.getCommandConfig().size() + " commands"); + if (configuration.getCommandConfig().containsKey(commandKey)) { + commandShowsUp.set(true); + } + if (!configuration.getThreadPoolConfig().isEmpty()) { + threadPoolShowsUp.set(true); + } + } + }); + + assertTrue(latch.await(10000, TimeUnit.MILLISECONDS)); + assertTrue(commandShowsUp.get()); + assertTrue(threadPoolShowsUp.get()); + } + + @Test + public void testTwoSubscribersOneUnsubscribes() throws Exception { + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + final AtomicInteger payloads1 = new AtomicInteger(0); + final AtomicInteger payloads2 = new AtomicInteger(0); + + Subscription s1 = stream + .observe() + .take(100) + .doOnUnsubscribe(new Action0() { + @Override + public void call() { + latch1.countDown(); + } + }) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnCompleted"); + latch1.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnError : " + e); + latch1.countDown(); + } + + @Override + public void onNext(HystrixConfiguration configuration) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnNext : " + configuration); + payloads1.incrementAndGet(); + } + }); + + Subscription s2 = stream + .observe() + .take(100) + .doOnUnsubscribe(new Action0() { + @Override + public void call() { + latch2.countDown(); + } + }) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnCompleted"); + latch2.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnError : " + e); + latch2.countDown(); + } + + @Override + public void onNext(HystrixConfiguration configuration) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnNext : " + configuration); + payloads2.incrementAndGet(); + } + }); + //execute 1 command, then unsubscribe from first stream. then execute the rest + for (int i = 0; i < 50; i++) { + HystrixCommand cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50); + cmd.execute(); + if (i == 1) { + s1.unsubscribe(); + } + } + + assertTrue(latch1.await(10000, TimeUnit.MILLISECONDS)); + assertTrue(latch2.await(10000, TimeUnit.MILLISECONDS)); + System.out.println("s1 got : " + payloads1.get() + ", s2 got : " + payloads2.get()); + assertTrue("s1 got data", payloads1.get() > 0); + assertTrue("s2 got data", payloads2.get() > 0); + assertTrue("s1 got less data than s2", payloads2.get() > payloads1.get()); + } + + @Test + public void testTwoSubscribersBothUnsubscribe() throws Exception { + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + final AtomicInteger payloads1 = new AtomicInteger(0); + final AtomicInteger payloads2 = new AtomicInteger(0); + + Subscription s1 = stream + .observe() + .take(100) + .doOnUnsubscribe(new Action0() { + @Override + public void call() { + latch1.countDown(); + } + }) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnCompleted"); + latch1.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnError : " + e); + latch1.countDown(); + } + + @Override + public void onNext(HystrixConfiguration configuration) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnNext : " + configuration); + payloads1.incrementAndGet(); + } + }); + + Subscription s2 = stream + .observe() + .take(100) + .doOnUnsubscribe(new Action0() { + @Override + public void call() { + latch2.countDown(); + } + }) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnCompleted"); + latch2.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnError : " + e); + latch2.countDown(); + } + + @Override + public void onNext(HystrixConfiguration configuration) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnNext : " + configuration); + payloads2.incrementAndGet(); + } + }); + //execute 2 commands, then unsubscribe from both streams, then execute the rest + for (int i = 0; i < 10; i++) { + HystrixCommand cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50); + cmd.execute(); + if (i == 2) { + s1.unsubscribe(); + s2.unsubscribe(); + } + } + assertFalse(stream.isSourceCurrentlySubscribed()); //both subscriptions have been cancelled - source should be too + + assertTrue(latch1.await(10000, TimeUnit.MILLISECONDS)); + assertTrue(latch2.await(10000, TimeUnit.MILLISECONDS)); + System.out.println("s1 got : " + payloads1.get() + ", s2 got : " + payloads2.get()); + assertTrue("s1 got data", payloads1.get() > 0); + assertTrue("s2 got data", payloads2.get() > 0); + } + + @Test + public void testTwoSubscribersOneSlowOneFast() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean foundError = new AtomicBoolean(false); + + Observable fast = stream + .observe() + .observeOn(Schedulers.newThread()); + Observable slow = stream + .observe() + .observeOn(Schedulers.newThread()) + .map(new Func1() { + @Override + public HystrixConfiguration call(HystrixConfiguration config) { + try { + Thread.sleep(100); + return config; + } catch (InterruptedException ex) { + return config; + } + } + }); + + Observable checkZippedEqual = Observable.zip(fast, slow, new Func2() { + @Override + public Boolean call(HystrixConfiguration payload, HystrixConfiguration payload2) { + return payload == payload2; + } + }); + + Subscription s1 = checkZippedEqual + .take(10000) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnCompleted"); + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnError : " + e); + e.printStackTrace(); + foundError.set(true); + latch.countDown(); + } + + @Override + public void onNext(Boolean b) { + //System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnNext : " + b); + } + }); + + for (int i = 0; i < 50; i++) { + HystrixCommand cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50); + cmd.execute(); + } + + latch.await(10000, TimeUnit.MILLISECONDS); + assertFalse(foundError.get()); + } +} diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/metric/sample/HystrixUtilizationStreamTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/metric/sample/HystrixUtilizationStreamTest.java new file mode 100644 index 000000000..959ac1cc9 --- /dev/null +++ b/hystrix-core/src/test/java/com/netflix/hystrix/metric/sample/HystrixUtilizationStreamTest.java @@ -0,0 +1,324 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.metric.sample; + +import com.hystrix.junit.HystrixRequestContextRule; +import com.netflix.hystrix.HystrixCommand; +import com.netflix.hystrix.HystrixCommandGroupKey; +import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixEventType; +import com.netflix.hystrix.metric.CommandStreamTest; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import rx.Observable; +import rx.Subscriber; +import rx.Subscription; +import rx.functions.Action0; +import rx.functions.Func1; +import rx.functions.Func2; +import rx.schedulers.Schedulers; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class HystrixUtilizationStreamTest extends CommandStreamTest { + + @Rule + public HystrixRequestContextRule ctx = new HystrixRequestContextRule(); + + HystrixUtilizationStream stream; + private final static HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("Util"); + private final static HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("Command"); + + @Before + public void init() { + stream = HystrixUtilizationStream.getNonSingletonInstanceOnlyUsedInUnitTests(10); + } + + @Test + public void testStreamHasData() throws Exception { + final AtomicBoolean commandShowsUp = new AtomicBoolean(false); + final AtomicBoolean threadPoolShowsUp = new AtomicBoolean(false); + final CountDownLatch latch = new CountDownLatch(1); + final int NUM = 10; + + for (int i = 0; i < 2; i++) { + HystrixCommand cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50); + cmd.observe(); + } + + stream.observe().take(NUM).subscribe( + new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnCompleted"); + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnError : " + e); + latch.countDown(); + } + + @Override + public void onNext(HystrixUtilization utilization) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Received data with : " + utilization.getCommandUtilizationMap().size() + " commands"); + if (utilization.getCommandUtilizationMap().containsKey(commandKey)) { + commandShowsUp.set(true); + } + if (!utilization.getThreadPoolUtilizationMap().isEmpty()) { + threadPoolShowsUp.set(true); + } + } + }); + + assertTrue(latch.await(10000, TimeUnit.MILLISECONDS)); + assertTrue(commandShowsUp.get()); + assertTrue(threadPoolShowsUp.get()); + } + + @Test + public void testTwoSubscribersOneUnsubscribes() throws Exception { + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + final AtomicInteger payloads1 = new AtomicInteger(0); + final AtomicInteger payloads2 = new AtomicInteger(0); + + Subscription s1 = stream + .observe() + .take(100) + .doOnUnsubscribe(new Action0() { + @Override + public void call() { + latch1.countDown(); + } + }) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnCompleted"); + latch1.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnError : " + e); + latch1.countDown(); + } + + @Override + public void onNext(HystrixUtilization utilization) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnNext : " + utilization); + payloads1.incrementAndGet(); + } + }); + + Subscription s2 = stream + .observe() + .take(100) + .doOnUnsubscribe(new Action0() { + @Override + public void call() { + latch2.countDown(); + } + }) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnCompleted"); + latch2.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnError : " + e); + latch2.countDown(); + } + + @Override + public void onNext(HystrixUtilization utilization) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnNext : " + utilization); + payloads2.incrementAndGet(); + } + }); + //execute 1 command, then unsubscribe from first stream. then execute the rest + for (int i = 0; i < 50; i++) { + HystrixCommand cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50); + cmd.execute(); + if (i == 1) { + s1.unsubscribe(); + } + } + + assertTrue(latch1.await(10000, TimeUnit.MILLISECONDS)); + assertTrue(latch2.await(10000, TimeUnit.MILLISECONDS)); + System.out.println("s1 got : " + payloads1.get() + ", s2 got : " + payloads2.get()); + assertTrue("s1 got data", payloads1.get() > 0); + assertTrue("s2 got data", payloads2.get() > 0); + assertTrue("s1 got less data than s2", payloads2.get() > payloads1.get()); + } + + @Test + public void testTwoSubscribersBothUnsubscribe() throws Exception { + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + final AtomicInteger payloads1 = new AtomicInteger(0); + final AtomicInteger payloads2 = new AtomicInteger(0); + + Subscription s1 = stream + .observe() + .take(100) + .doOnUnsubscribe(new Action0() { + @Override + public void call() { + latch1.countDown(); + } + }) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnCompleted"); + latch1.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnError : " + e); + latch1.countDown(); + } + + @Override + public void onNext(HystrixUtilization utilization) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnNext : " + utilization); + payloads1.incrementAndGet(); + } + }); + + Subscription s2 = stream + .observe() + .take(100) + .doOnUnsubscribe(new Action0() { + @Override + public void call() { + latch2.countDown(); + } + }) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnCompleted"); + latch2.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnError : " + e); + latch2.countDown(); + } + + @Override + public void onNext(HystrixUtilization utilization) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnNext : " + utilization); + payloads2.incrementAndGet(); + } + }); + //execute 2 commands, then unsubscribe from both streams, then execute the rest + for (int i = 0; i < 10; i++) { + HystrixCommand cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50); + cmd.execute(); + if (i == 2) { + s1.unsubscribe(); + s2.unsubscribe(); + } + } + assertFalse(stream.isSourceCurrentlySubscribed()); //both subscriptions have been cancelled - source should be too + + assertTrue(latch1.await(10000, TimeUnit.MILLISECONDS)); + assertTrue(latch2.await(10000, TimeUnit.MILLISECONDS)); + System.out.println("s1 got : " + payloads1.get() + ", s2 got : " + payloads2.get()); + assertTrue("s1 got data", payloads1.get() > 0); + assertTrue("s2 got data", payloads2.get() > 0); + } + + @Test + public void testTwoSubscribersOneSlowOneFast() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean foundError = new AtomicBoolean(false); + + Observable fast = stream + .observe() + .observeOn(Schedulers.newThread()); + Observable slow = stream + .observe() + .observeOn(Schedulers.newThread()) + .map(new Func1() { + @Override + public HystrixUtilization call(HystrixUtilization util) { + try { + Thread.sleep(100); + return util; + } catch (InterruptedException ex) { + return util; + } + } + }); + + Observable checkZippedEqual = Observable.zip(fast, slow, new Func2() { + @Override + public Boolean call(HystrixUtilization payload, HystrixUtilization payload2) { + return payload == payload2; + } + }); + + Subscription s1 = checkZippedEqual + .take(10000) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnCompleted"); + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnError : " + e); + e.printStackTrace(); + foundError.set(true); + latch.countDown(); + } + + @Override + public void onNext(Boolean b) { + //System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnNext : " + b); + } + }); + + for (int i = 0; i < 50; i++) { + HystrixCommand cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50); + cmd.execute(); + } + + latch.await(10000, TimeUnit.MILLISECONDS); + assertFalse(foundError.get()); + } +}