Skip to content

Commit

Permalink
Updates from feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
garrettjonesgoogle committed Mar 25, 2016
1 parent 38d978f commit 7d7e315
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ public void validateItem(T message) {

@Override
public void processBundle(List<T> bundle) {
List<T> bundleCopy = new ArrayList<>();
bundleCopy.addAll(bundle);
bundles.add(bundleCopy);
bundles.add(new ArrayList<>(bundle));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public class NumericThreshold<E> implements BundlingThreshold<E> {
* Constructs a NumericThreshold.
*
* @param threshold The value that allows an event to happen.
* @param limit The value that forces an event to happen.
* @param limit The value that forces an event to happen. If null, then this is
* not enforced.
* @param extractor Object that extracts a numeric value from the value object.
*/
public NumericThreshold(long threshold, @Nullable Long limit, ElementCounter<E> extractor) {
Expand Down
24 changes: 14 additions & 10 deletions src/main/java/com/google/api/gax/bundling/ThresholdBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,38 +151,34 @@ public ThresholdBundleHandle add(E e) {
final Lock lock = this.lock;
lock.lock();
try {
for (BundlingThreshold<E> threshold : thresholdPrototypes) {
if (!threshold.canAccept(e)) {
throw new IllegalArgumentException("Single item too large for bundle");
}
}
validateLimits(e);

boolean signal = false;
boolean signalBundleIsReady = false;
Bundle bundleOfAddedItem = null;
if (currentOpenBundle == null) {
currentOpenBundle = new Bundle(thresholdPrototypes, externalThresholdPrototypes, maxDelay);
currentOpenBundle.start();
signal = true;
signalBundleIsReady = true;
}

if (currentOpenBundle.canAccept(e)) {
currentOpenBundle.add(e);
bundleOfAddedItem = currentOpenBundle;
if (currentOpenBundle.isAnyThresholdReached()) {
signal = true;
signalBundleIsReady = true;
closedBundles.add(currentOpenBundle);
currentOpenBundle = null;
}
} else {
signal = true;
signalBundleIsReady = true;
closedBundles.add(currentOpenBundle);
currentOpenBundle = new Bundle(thresholdPrototypes, externalThresholdPrototypes, maxDelay);
currentOpenBundle.start();
currentOpenBundle.add(e);
bundleOfAddedItem = currentOpenBundle;
}

if (signal) {
if (signalBundleIsReady) {
bundleCondition.signalAll();
}
return bundleOfAddedItem;
Expand Down Expand Up @@ -282,6 +278,14 @@ public boolean isEmpty() {
}
}

private void validateLimits(E e) {
for (BundlingThreshold<E> threshold : thresholdPrototypes) {
if (!threshold.canAccept(e)) {
throw new IllegalArgumentException("Single item too large for bundle");
}
}
}

private boolean shouldWait() {
if (closedBundles.size() > 0) {
return false;
Expand Down

0 comments on commit 7d7e315

Please sign in to comment.