From 38d978f10b4eea01c5c4d106e525dbb11b2cf773 Mon Sep 17 00:00:00 2001 From: Garrett Jones Date: Thu, 24 Mar 2016 16:05:25 -0700 Subject: [PATCH 1/2] Supporting bundling limits --- .../bundling/AccumulatingBundleReceiver.java | 4 +- .../api/gax/bundling/BundlingThreshold.java | 6 + .../api/gax/bundling/BundlingThresholds.java | 2 +- .../api/gax/bundling/NumericThreshold.java | 21 +- .../api/gax/bundling/ThresholdBundler.java | 199 +++++++++++------- .../bundling/ThresholdBundlingForwarder.java | 8 +- .../com/google/api/gax/grpc/ApiCallable.java | 16 +- .../google/api/gax/grpc/BundlerFactory.java | 16 +- .../google/api/gax/grpc/BundlingSettings.java | 26 +++ .../api/gax/grpc/ServiceApiSettings.java | 4 +- .../gax/bundling/ThresholdBundlerTest.java | 111 +++++++++- 11 files changed, 314 insertions(+), 99 deletions(-) diff --git a/src/main/java/com/google/api/gax/bundling/AccumulatingBundleReceiver.java b/src/main/java/com/google/api/gax/bundling/AccumulatingBundleReceiver.java index 16b2a7d34923..2692edf528fa 100644 --- a/src/main/java/com/google/api/gax/bundling/AccumulatingBundleReceiver.java +++ b/src/main/java/com/google/api/gax/bundling/AccumulatingBundleReceiver.java @@ -48,7 +48,9 @@ public void validateItem(T message) { @Override public void processBundle(List bundle) { - bundles.add(bundle); + List bundleCopy = new ArrayList<>(); + bundleCopy.addAll(bundle); + bundles.add(bundleCopy); } /** diff --git a/src/main/java/com/google/api/gax/bundling/BundlingThreshold.java b/src/main/java/com/google/api/gax/bundling/BundlingThreshold.java index cbc12b105b81..fe40d876b926 100644 --- a/src/main/java/com/google/api/gax/bundling/BundlingThreshold.java +++ b/src/main/java/com/google/api/gax/bundling/BundlingThreshold.java @@ -38,6 +38,12 @@ */ public interface BundlingThreshold { + /** + * Returns true if adding this value would breach the limit for the value in + * this threshold. + */ + boolean canAccept(E e); + /** * Presents the element to the threshold for the attribute of interest to be accumulated. * diff --git a/src/main/java/com/google/api/gax/bundling/BundlingThresholds.java b/src/main/java/com/google/api/gax/bundling/BundlingThresholds.java index 37af5e9eb480..edfb5617e24c 100644 --- a/src/main/java/com/google/api/gax/bundling/BundlingThresholds.java +++ b/src/main/java/com/google/api/gax/bundling/BundlingThresholds.java @@ -45,7 +45,7 @@ public class BundlingThresholds { */ public static ImmutableList> of(long elementThreshold) { BundlingThreshold bundlingThreshold = - new NumericThreshold(elementThreshold, new ElementCounter() { + new NumericThreshold(elementThreshold, null, new ElementCounter() { @Override public long count(E e) { return 1; diff --git a/src/main/java/com/google/api/gax/bundling/NumericThreshold.java b/src/main/java/com/google/api/gax/bundling/NumericThreshold.java index 863199b77c2c..da9be9ca1f7e 100644 --- a/src/main/java/com/google/api/gax/bundling/NumericThreshold.java +++ b/src/main/java/com/google/api/gax/bundling/NumericThreshold.java @@ -33,24 +33,41 @@ import com.google.common.base.Preconditions; +import javax.annotation.Nullable; + /** * A threshold which accumulates a count based on the provided * ElementCounter. */ public class NumericThreshold implements BundlingThreshold { private final long threshold; + private final Long limit; private final ElementCounter extractor; private long sum; /** * Constructs a NumericThreshold. + * + * @param threshold The value that allows an event to happen. + * @param limit The value that forces an event to happen. + * @param extractor Object that extracts a numeric value from the value object. */ - public NumericThreshold(long threshold, ElementCounter extractor) { + public NumericThreshold(long threshold, @Nullable Long limit, ElementCounter extractor) { this.threshold = threshold; + this.limit = limit; this.extractor = Preconditions.checkNotNull(extractor); this.sum = 0; } + @Override + public boolean canAccept(E e) { + if (limit == null) { + return true; + } else { + return sum + extractor.count(e) <= limit; + } + } + @Override public void accumulate(E e) { sum += extractor.count(e); @@ -63,6 +80,6 @@ public boolean isThresholdReached() { @Override public BundlingThreshold copyWithZeroedValue() { - return new NumericThreshold(threshold, extractor); + return new NumericThreshold(threshold, limit, extractor); } } diff --git a/src/main/java/com/google/api/gax/bundling/ThresholdBundler.java b/src/main/java/com/google/api/gax/bundling/ThresholdBundler.java index 43a478f9e6a7..eaeabc6c23b5 100644 --- a/src/main/java/com/google/api/gax/bundling/ThresholdBundler.java +++ b/src/main/java/com/google/api/gax/bundling/ThresholdBundler.java @@ -52,26 +52,23 @@ */ public class ThresholdBundler { - private ImmutableList> thresholds; - private ImmutableList> externalThresholds; + private ImmutableList> thresholdPrototypes; + private ImmutableList> externalThresholdPrototypes; private final Duration maxDelay; private final Lock lock = new ReentrantLock(); private final Condition bundleCondition = lock.newCondition(); - private boolean bundleReady = false; - private BundleHandle currentBundleHandle; - - private Stopwatch bundleStopwatch; - private final List data = new ArrayList<>(); + private Bundle currentOpenBundle; + private List closedBundles = new ArrayList<>(); private ThresholdBundler(ImmutableList> thresholds, ImmutableList> externalThresholds, Duration maxDelay) { - this.thresholds = copyResetThresholds(Preconditions.checkNotNull(thresholds)); - this.externalThresholds = copyResetExternalThresholds( + this.thresholdPrototypes = copyResetThresholds(Preconditions.checkNotNull(thresholds)); + this.externalThresholdPrototypes = copyResetExternalThresholds( Preconditions.checkNotNull(externalThresholds)); this.maxDelay = maxDelay; - this.currentBundleHandle = new BundleHandle(externalThresholds); + this.currentOpenBundle = null; } /** @@ -154,32 +151,41 @@ public ThresholdBundleHandle add(E e) { final Lock lock = this.lock; lock.lock(); try { + for (BundlingThreshold threshold : thresholdPrototypes) { + if (!threshold.canAccept(e)) { + throw new IllegalArgumentException("Single item too large for bundle"); + } + } + boolean signal = false; - // TODO verify invariant: bundleStopwatch == null iff size() == 0 - if (data.size() == 0) { - bundleStopwatch = Stopwatch.createStarted(); - // we want to trigger the signal so that we switch the await from an unbounded - // await to a time-bounded await. + Bundle bundleOfAddedItem = null; + if (currentOpenBundle == null) { + currentOpenBundle = new Bundle(thresholdPrototypes, externalThresholdPrototypes, maxDelay); + currentOpenBundle.start(); signal = true; - for (ExternalThreshold threshold : externalThresholds) { - threshold.startBundle(); - } } - data.add(e); - if (!bundleReady) { - for (BundlingThreshold threshold : thresholds) { - threshold.accumulate(e); - if (threshold.isThresholdReached()) { - bundleReady = true; - signal = true; - break; - } + + if (currentOpenBundle.canAccept(e)) { + currentOpenBundle.add(e); + bundleOfAddedItem = currentOpenBundle; + if (currentOpenBundle.isAnyThresholdReached()) { + signal = true; + closedBundles.add(currentOpenBundle); + currentOpenBundle = null; } + } else { + signal = true; + closedBundles.add(currentOpenBundle); + currentOpenBundle = new Bundle(thresholdPrototypes, externalThresholdPrototypes, maxDelay); + currentOpenBundle.start(); + currentOpenBundle.add(e); + bundleOfAddedItem = currentOpenBundle; } + if (signal) { bundleCondition.signalAll(); } - return currentBundleHandle; + return bundleOfAddedItem; } finally { lock.unlock(); } @@ -193,7 +199,10 @@ public void flush() { final Lock lock = this.lock; lock.lock(); try { - bundleReady = true; + if (currentOpenBundle != null) { + closedBundles.add(currentOpenBundle); + currentOpenBundle = null; + } bundleCondition.signalAll(); } finally { lock.unlock(); @@ -206,23 +215,24 @@ public void flush() { * * @return the number of items added to 'bundle'. */ - public int drainTo(Collection bundle) { + public int drainNextBundleTo(Collection outputCollection) { final Lock lock = this.lock; lock.lock(); try { - int dataSize = data.size(); - - bundle.addAll(data); - data.clear(); - currentBundleHandle = new BundleHandle(externalThresholds); - - thresholds = copyResetThresholds(thresholds); - externalThresholds = copyResetExternalThresholds(externalThresholds); - - bundleStopwatch = null; - bundleReady = false; + Bundle outBundle = null; + if (closedBundles.size() > 0) { + outBundle = closedBundles.remove(0); + } else if (currentOpenBundle != null) { + outBundle = currentOpenBundle; + currentOpenBundle = null; + } - return dataSize; + if (outBundle != null) { + outputCollection.addAll(outBundle.getData()); + return outputCollection.size(); + } else { + return 0; + } } finally { lock.unlock(); } @@ -236,66 +246,53 @@ public List takeBundle() throws InterruptedException { lock.lockInterruptibly(); try { while (shouldWait()) { - if (data.size() == 0 || maxDelay == null) { + if (currentOpenBundle == null || maxDelay == null) { // if an element gets added, this will be signaled, then we will re-check the while-loop // condition to see if the delay or other thresholds have been exceeded, // and if none of these are true, then we will arrive at the time-bounded // await in the else clause. bundleCondition.await(); } else { - bundleCondition.await(getDelayLeft().getMillis(), TimeUnit.MILLISECONDS); + bundleCondition.await(currentOpenBundle.getDelayLeft().getMillis(), + TimeUnit.MILLISECONDS); } } + List bundle = new ArrayList<>(); - drainTo(bundle); + drainNextBundleTo(bundle); return bundle; } finally { lock.unlock(); } } - /** - * Returns the number of elements queued up in the bundler. - */ - public int size() { + public boolean isEmpty() { final Lock lock = this.lock; lock.lock(); try { - return data.size(); - } finally { - lock.unlock(); - } - } - - /** - * Returns the elements queued up in the bundler. - */ - public Object[] toArray() { - final Lock lock = this.lock; - lock.lock(); - try { - return data.toArray(); + if (closedBundles.size() > 0) { + return false; + } + if (currentOpenBundle != null) { + return false; + } + return true; } finally { lock.unlock(); } } private boolean shouldWait() { - if (data.size() == 0) { - return true; - } - if (bundleReady) { + if (closedBundles.size() > 0) { return false; } + if (currentOpenBundle == null) { + return true; + } if (maxDelay == null) { return true; } - return getDelayLeft().getMillis() > 0; - } - - // pre-condition: data.size() > 0 ( === bundleStopwatch != null) - private Duration getDelayLeft() { - return Duration.millis(maxDelay.getMillis() - bundleStopwatch.elapsed(TimeUnit.MILLISECONDS)); + return currentOpenBundle.getDelayLeft().getMillis() > 0; } private static ImmutableList> copyResetThresholds( @@ -324,11 +321,59 @@ private static ImmutableList> copyResetExternalThreshol * a ThresholdBundler, but only if the bundle referenced is still the active * one. */ - private class BundleHandle implements ThresholdBundleHandle { + private class Bundle implements ThresholdBundleHandle { + private final ImmutableList> thresholds; private final ImmutableList> externalThresholds; + private final Duration maxDelay; + private final List data = new ArrayList<>(); + private Stopwatch stopwatch; + + private Bundle(ImmutableList> thresholds, + ImmutableList> externalThresholds, + Duration maxDelay) { + this.thresholds = copyResetThresholds(thresholds); + this.externalThresholds = copyResetExternalThresholds(externalThresholds); + this.maxDelay = maxDelay; + } - private BundleHandle(ImmutableList> externalThresholds) { - this.externalThresholds = externalThresholds; + private void start() { + stopwatch = Stopwatch.createStarted(); + for (ExternalThreshold threshold : externalThresholds) { + threshold.startBundle(); + } + } + + private boolean canAccept(E e) { + for (BundlingThreshold threshold : thresholds) { + if (!threshold.canAccept(e)) { + return false; + } + } + return true; + } + + private void add(E e) { + data.add(e); + for (BundlingThreshold threshold : thresholds) { + threshold.accumulate(e); + } + } + + private List getData() { + return data; + } + + private Duration getDelayLeft() { + return Duration.millis(maxDelay.getMillis() - stopwatch.elapsed(TimeUnit.MILLISECONDS)); + } + + private boolean isAnyThresholdReached() { + for (BundlingThreshold threshold : thresholds) { + if (threshold.isThresholdReached()) { + return true; + } + } + return false; } @Override @@ -351,7 +396,7 @@ public void flush() { lock.lock(); try { - if (ThresholdBundler.this.currentBundleHandle != this) { + if (ThresholdBundler.this.currentOpenBundle != this) { return; } ThresholdBundler.this.flush(); diff --git a/src/main/java/com/google/api/gax/bundling/ThresholdBundlingForwarder.java b/src/main/java/com/google/api/gax/bundling/ThresholdBundlingForwarder.java index ea84ca11e2dd..a050a8b5be1c 100644 --- a/src/main/java/com/google/api/gax/bundling/ThresholdBundlingForwarder.java +++ b/src/main/java/com/google/api/gax/bundling/ThresholdBundlingForwarder.java @@ -96,9 +96,11 @@ public void run() { } } while (!Thread.currentThread().isInterrupted()); - List lastBundle = new ArrayList<>(); - bundler.drainTo(lastBundle); - processBundle(lastBundle); + List bundleData = new ArrayList<>(); + while (bundler.drainNextBundleTo(bundleData) > 0) { + processBundle(bundleData); + bundleData = new ArrayList<>(); + } } private void processBundle(List bundle) { diff --git a/src/main/java/com/google/api/gax/grpc/ApiCallable.java b/src/main/java/com/google/api/gax/grpc/ApiCallable.java index 60ca257c60b5..bebc5ab2b633 100644 --- a/src/main/java/com/google/api/gax/grpc/ApiCallable.java +++ b/src/main/java/com/google/api/gax/grpc/ApiCallable.java @@ -288,6 +288,7 @@ public static BundlableApiCallableInfo extends Builder { private final BundlingDescriptor bundlingDescriptor; + private final BundlingSettings defaultBundlingSettings; private BundlingSettings bundlingSettings; @@ -301,14 +302,25 @@ public static class BundlableBuilder extends Builder grpcMethodDescriptor, - BundlingDescriptor bundlingDescriptor) { + BundlingDescriptor bundlingDescriptor, + BundlingSettings defaultBundlingSettings) { super(grpcMethodDescriptor); this.bundlingDescriptor = bundlingDescriptor; + this.defaultBundlingSettings = defaultBundlingSettings; this.bundlingSettings = null; } /** - * Provides the bundling settings to use. + * Returns the default bundling settings for this Api call, which are set to + * safe defaults. + */ + public BundlingSettings getDefaultBundlingSettings() { + return defaultBundlingSettings; + } + + /** + * Provides the bundling settings to use. The BundlingSettings returned from + * getDefaultBundlingSettings contains the recommended defaults. */ public BundlableBuilder setBundlingSettings( BundlingSettings bundlingSettings) { diff --git a/src/main/java/com/google/api/gax/grpc/BundlerFactory.java b/src/main/java/com/google/api/gax/grpc/BundlerFactory.java index c75de14014cd..febb6d7db6be 100644 --- a/src/main/java/com/google/api/gax/grpc/BundlerFactory.java +++ b/src/main/java/com/google/api/gax/grpc/BundlerFactory.java @@ -122,8 +122,14 @@ public long count(BundlingContext bundlablePublish) { } }; + Long elementCountLimit = null; + if (bundlingSettings.getElementCountLimit() != null) { + elementCountLimit = bundlingSettings.getElementCountLimit().longValue(); + } + BundlingThreshold> countThreshold = - new NumericThreshold<>(bundlingSettings.getElementCountThreshold(), elementCounter); + new NumericThreshold<>(bundlingSettings.getElementCountThreshold(), + elementCountLimit, elementCounter); listBuilder.add(countThreshold); } @@ -136,8 +142,14 @@ public long count(BundlingContext bundlablePublish) { } }; + Long requestByteLimit = null; + if (bundlingSettings.getRequestByteLimit() != null) { + requestByteLimit = bundlingSettings.getRequestByteLimit().longValue(); + } + BundlingThreshold> byteThreshold = - new NumericThreshold<>(bundlingSettings.getRequestByteThreshold(), requestByteCounter); + new NumericThreshold<>(bundlingSettings.getRequestByteThreshold(), + requestByteLimit, requestByteCounter); listBuilder.add(byteThreshold); } diff --git a/src/main/java/com/google/api/gax/grpc/BundlingSettings.java b/src/main/java/com/google/api/gax/grpc/BundlingSettings.java index acf6a7a060d7..166c0449e8cf 100644 --- a/src/main/java/com/google/api/gax/grpc/BundlingSettings.java +++ b/src/main/java/com/google/api/gax/grpc/BundlingSettings.java @@ -52,12 +52,24 @@ public abstract class BundlingSettings { @Nullable public abstract Integer getElementCountThreshold(); + /** + * Get the element count limit to use for bundling. + */ + @Nullable + public abstract Integer getElementCountLimit(); + /** * Get the request byte threshold to use for bundling. */ @Nullable public abstract Integer getRequestByteThreshold(); + /** + * Get the request byte limit to use for bundling. + */ + @Nullable + public abstract Integer getRequestByteLimit(); + /** * Get the delay threshold to use for bundling. */ @@ -91,12 +103,26 @@ public abstract static class Builder { */ public abstract Builder setElementCountThreshold(Integer elementCountThreshold); + /** + * Set the element count limit to use for bundling. Any individual requests with + * more than this many elements will be rejected outright, and bundles will not + * be formed with more than this many elements. + */ + public abstract Builder setElementCountLimit(Integer elementCountLimit); + /** * Set the request byte threshold to use for bundling. After this many bytes * are accumulated, the elements will be wrapped up in a bundle and sent. */ public abstract Builder setRequestByteThreshold(Integer requestByteThreshold); + /** + * Set the request byte limit to use for bundling. Any individual requests with + * more than this many bytes will be rejected outright, and bundles will not + * be formed with more than this many bytes. + */ + public abstract Builder setRequestByteLimit(Integer requestByteLimit); + /** * Set the delay threshold to use for bundling. After this amount of time has * elapsed (counting from the first element added), the elements will be wrapped diff --git a/src/main/java/com/google/api/gax/grpc/ServiceApiSettings.java b/src/main/java/com/google/api/gax/grpc/ServiceApiSettings.java index beb2b809b576..0e2301204176 100644 --- a/src/main/java/com/google/api/gax/grpc/ServiceApiSettings.java +++ b/src/main/java/com/google/api/gax/grpc/ServiceApiSettings.java @@ -115,7 +115,7 @@ public boolean shouldAutoClose() { * Provides the connection settings necessary to create a channel. */ public ServiceApiSettings provideChannelWith( - final ConnectionSettings settings,final boolean shouldAutoClose) { + final ConnectionSettings settings) { channelProvider = new ChannelProvider() { private ManagedChannel channel = null; @Override @@ -137,7 +137,7 @@ public ManagedChannel getChannel(Executor executor) throws IOException { @Override public boolean shouldAutoClose() { - return shouldAutoClose; + return true; } private String serviceHeader() { diff --git a/src/test/java/com/google/api/gax/bundling/ThresholdBundlerTest.java b/src/test/java/com/google/api/gax/bundling/ThresholdBundlerTest.java index 6a8919428d65..0b230fed4188 100644 --- a/src/test/java/com/google/api/gax/bundling/ThresholdBundlerTest.java +++ b/src/test/java/com/google/api/gax/bundling/ThresholdBundlerTest.java @@ -38,6 +38,7 @@ import java.util.List; import org.joda.time.Duration; +import org.junit.Assert; import org.junit.Test; public class ThresholdBundlerTest { @@ -48,10 +49,9 @@ public void testEmptyAddAndDrain() { .setThresholds(BundlingThresholds.of(5)) .build(); List resultBundle = new ArrayList<>(); - Truth.assertThat(bundler.size()).isEqualTo(0); - Truth.assertThat(bundler.toArray()).isEqualTo(new Integer[]{}); + Truth.assertThat(bundler.isEmpty()).isTrue(); - int drained = bundler.drainTo(resultBundle); + int drained = bundler.drainNextBundleTo(resultBundle); Truth.assertThat(drained).isEqualTo(0); Truth.assertThat(resultBundle).isEqualTo(new ArrayList<>()); } @@ -62,18 +62,16 @@ public void testAddAndDrain() { .setThresholds(BundlingThresholds.of(5)) .build(); bundler.add(14); - Truth.assertThat(bundler.size()).isEqualTo(1); - Truth.assertThat(bundler.toArray()).isEqualTo(new Integer[]{14}); + Truth.assertThat(bundler.isEmpty()).isFalse(); List resultBundle = new ArrayList<>(); - int drained = bundler.drainTo(resultBundle); + int drained = bundler.drainNextBundleTo(resultBundle); Truth.assertThat(drained).isEqualTo(1); Truth.assertThat(resultBundle).isEqualTo(Arrays.asList(14)); - Truth.assertThat(bundler.size()).isEqualTo(0); - Truth.assertThat(bundler.toArray()).isEqualTo(new Integer[]{}); + Truth.assertThat(bundler.isEmpty()).isTrue(); List resultBundle2 = new ArrayList<>(); - int drained2 = bundler.drainTo(resultBundle2); + int drained2 = bundler.drainNextBundleTo(resultBundle2); Truth.assertThat(drained2).isEqualTo(0); Truth.assertThat(resultBundle2).isEqualTo(new ArrayList<>()); } @@ -167,6 +165,9 @@ public void testFlush() throws Exception { // Give time for the forwarder thread to catch the bundle Thread.sleep(100); + // should have no effect (everything should be consumed) + bundler.flush(); + } finally { forwarder.close(); } @@ -236,4 +237,96 @@ public ExternalThreshold copyWithZeroedValue() { Arrays.asList(7, 9)); Truth.assertThat(receiver.getBundles()).isEqualTo(expected); } + + private BundlingThreshold createValueThreshold(long threshold, Long limit) { + return new NumericThreshold(threshold, limit, new ElementCounter() { + @Override + public long count(Integer value) { + return value; + } + }); + } + + @Test + public void testBundlingLimit() throws Exception { + ThresholdBundler bundler = ThresholdBundler.newBuilder() + .addThreshold(createValueThreshold(4L, 5L)) + .build(); + AccumulatingBundleReceiver receiver = + new AccumulatingBundleReceiver(); + ThresholdBundlingForwarder forwarder = + new ThresholdBundlingForwarder(bundler, receiver); + + try { + forwarder.start(); + bundler.add(1); + bundler.add(2); + // jumps from below the threshold to over the limit with a single element + bundler.add(3); + + bundler.add(5); + + } finally { + forwarder.close(); + } + + List> expected = + Arrays.asList( + Arrays.asList(1, 2), + Arrays.asList(3), + Arrays.asList(5)); + Truth.assertThat(receiver.getBundles()).isEqualTo(expected); + } + + @Test + public void testBundlingLimitNoForwarder() throws Exception { + ThresholdBundler bundler = ThresholdBundler.newBuilder() + .addThreshold(createValueThreshold(4L, 5L)) + .build(); + + bundler.add(1); + bundler.add(2); + // jumps from below the threshold to over the limit with a single element + bundler.add(3); + bundler.add(1); + + // There should only be closed bundles at this point + Truth.assertThat(bundler.isEmpty()).isFalse(); + + List bundle1 = new ArrayList<>(); + bundler.drainNextBundleTo(bundle1); + List bundle2 = new ArrayList<>(); + bundler.drainNextBundleTo(bundle2); + List> actualBundles = Arrays.asList(bundle1, bundle2); + + Truth.assertThat(bundler.isEmpty()).isTrue(); + + List> expected = + Arrays.asList( + Arrays.asList(1, 2), + Arrays.asList(3, 1)); + Truth.assertThat(actualBundles).isEqualTo(expected); + } + + + @Test + public void testBundlingOverLimit() throws Exception { + ThresholdBundler bundler = ThresholdBundler.newBuilder() + .addThreshold(createValueThreshold(4L, 5L)) + .build(); + AccumulatingBundleReceiver receiver = + new AccumulatingBundleReceiver(); + ThresholdBundlingForwarder forwarder = + new ThresholdBundlingForwarder(bundler, receiver); + + try { + forwarder.start(); + bundler.add(6); + Assert.fail("Expected exception from bundling call"); + } catch (IllegalArgumentException e) { + // expected + } finally { + forwarder.close(); + } + } } From 7d7e315e92c45f5b7ed51d8ffc7e19c04a730a4e Mon Sep 17 00:00:00 2001 From: Garrett Jones Date: Fri, 25 Mar 2016 11:08:24 -0700 Subject: [PATCH 2/2] Updates from feedback --- .../bundling/AccumulatingBundleReceiver.java | 4 +--- .../api/gax/bundling/NumericThreshold.java | 3 ++- .../api/gax/bundling/ThresholdBundler.java | 24 +++++++++++-------- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/google/api/gax/bundling/AccumulatingBundleReceiver.java b/src/main/java/com/google/api/gax/bundling/AccumulatingBundleReceiver.java index 2692edf528fa..bca7b4e3d192 100644 --- a/src/main/java/com/google/api/gax/bundling/AccumulatingBundleReceiver.java +++ b/src/main/java/com/google/api/gax/bundling/AccumulatingBundleReceiver.java @@ -48,9 +48,7 @@ public void validateItem(T message) { @Override public void processBundle(List bundle) { - List bundleCopy = new ArrayList<>(); - bundleCopy.addAll(bundle); - bundles.add(bundleCopy); + bundles.add(new ArrayList<>(bundle)); } /** diff --git a/src/main/java/com/google/api/gax/bundling/NumericThreshold.java b/src/main/java/com/google/api/gax/bundling/NumericThreshold.java index da9be9ca1f7e..bed64ed30ecc 100644 --- a/src/main/java/com/google/api/gax/bundling/NumericThreshold.java +++ b/src/main/java/com/google/api/gax/bundling/NumericThreshold.java @@ -49,7 +49,8 @@ public class NumericThreshold implements BundlingThreshold { * Constructs a NumericThreshold. * * @param threshold The value that allows an event to happen. - * @param limit The value that forces an event to happen. + * @param limit The value that forces an event to happen. If null, then this is + * not enforced. * @param extractor Object that extracts a numeric value from the value object. */ public NumericThreshold(long threshold, @Nullable Long limit, ElementCounter extractor) { diff --git a/src/main/java/com/google/api/gax/bundling/ThresholdBundler.java b/src/main/java/com/google/api/gax/bundling/ThresholdBundler.java index eaeabc6c23b5..b11a16dc135b 100644 --- a/src/main/java/com/google/api/gax/bundling/ThresholdBundler.java +++ b/src/main/java/com/google/api/gax/bundling/ThresholdBundler.java @@ -151,30 +151,26 @@ public ThresholdBundleHandle add(E e) { final Lock lock = this.lock; lock.lock(); try { - for (BundlingThreshold threshold : thresholdPrototypes) { - if (!threshold.canAccept(e)) { - throw new IllegalArgumentException("Single item too large for bundle"); - } - } + validateLimits(e); - boolean signal = false; + boolean signalBundleIsReady = false; Bundle bundleOfAddedItem = null; if (currentOpenBundle == null) { currentOpenBundle = new Bundle(thresholdPrototypes, externalThresholdPrototypes, maxDelay); currentOpenBundle.start(); - signal = true; + signalBundleIsReady = true; } if (currentOpenBundle.canAccept(e)) { currentOpenBundle.add(e); bundleOfAddedItem = currentOpenBundle; if (currentOpenBundle.isAnyThresholdReached()) { - signal = true; + signalBundleIsReady = true; closedBundles.add(currentOpenBundle); currentOpenBundle = null; } } else { - signal = true; + signalBundleIsReady = true; closedBundles.add(currentOpenBundle); currentOpenBundle = new Bundle(thresholdPrototypes, externalThresholdPrototypes, maxDelay); currentOpenBundle.start(); @@ -182,7 +178,7 @@ public ThresholdBundleHandle add(E e) { bundleOfAddedItem = currentOpenBundle; } - if (signal) { + if (signalBundleIsReady) { bundleCondition.signalAll(); } return bundleOfAddedItem; @@ -282,6 +278,14 @@ public boolean isEmpty() { } } + private void validateLimits(E e) { + for (BundlingThreshold threshold : thresholdPrototypes) { + if (!threshold.canAccept(e)) { + throw new IllegalArgumentException("Single item too large for bundle"); + } + } + } + private boolean shouldWait() { if (closedBundles.size() > 0) { return false;