Skip to content

Commit

Permalink
Merge pull request googleapis#37 from garrettjonesgoogle/master
Browse files Browse the repository at this point in the history
Supporting bundling limits
  • Loading branch information
garrettjonesgoogle committed Mar 25, 2016
2 parents 4dcc19c + 7d7e315 commit a02a695
Show file tree
Hide file tree
Showing 11 changed files with 317 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void validateItem(T message) {

@Override
public void processBundle(List<T> bundle) {
bundles.add(bundle);
bundles.add(new ArrayList<>(bundle));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
*/
public interface BundlingThreshold<E> {

/**
* Returns true if adding this value would breach the limit for the value in
* this threshold.
*/
boolean canAccept(E e);

/**
* Presents the element to the threshold for the attribute of interest to be accumulated.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class BundlingThresholds {
*/
public static <E> ImmutableList<BundlingThreshold<E>> of(long elementThreshold) {
BundlingThreshold<E> bundlingThreshold =
new NumericThreshold<E>(elementThreshold, new ElementCounter<E>() {
new NumericThreshold<E>(elementThreshold, null, new ElementCounter<E>() {
@Override
public long count(E e) {
return 1;
Expand Down
22 changes: 20 additions & 2 deletions src/main/java/com/google/api/gax/bundling/NumericThreshold.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,42 @@

import com.google.common.base.Preconditions;

import javax.annotation.Nullable;

/**
* A threshold which accumulates a count based on the provided
* ElementCounter.
*/
public class NumericThreshold<E> implements BundlingThreshold<E> {
private final long threshold;
private final Long limit;
private final ElementCounter<E> extractor;
private long sum;

/**
* Constructs a NumericThreshold.
*
* @param threshold The value that allows an event to happen.
* @param limit The value that forces an event to happen. If null, then this is
* not enforced.
* @param extractor Object that extracts a numeric value from the value object.
*/
public NumericThreshold(long threshold, ElementCounter<E> extractor) {
public NumericThreshold(long threshold, @Nullable Long limit, ElementCounter<E> extractor) {
this.threshold = threshold;
this.limit = limit;
this.extractor = Preconditions.checkNotNull(extractor);
this.sum = 0;
}

@Override
public boolean canAccept(E e) {
if (limit == null) {
return true;
} else {
return sum + extractor.count(e) <= limit;
}
}

@Override
public void accumulate(E e) {
sum += extractor.count(e);
Expand All @@ -63,6 +81,6 @@ public boolean isThresholdReached() {

@Override
public BundlingThreshold<E> copyWithZeroedValue() {
return new NumericThreshold<E>(threshold, extractor);
return new NumericThreshold<E>(threshold, limit, extractor);
}
}
203 changes: 126 additions & 77 deletions src/main/java/com/google/api/gax/bundling/ThresholdBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,23 @@
*/
public class ThresholdBundler<E> {

private ImmutableList<BundlingThreshold<E>> thresholds;
private ImmutableList<ExternalThreshold<E>> externalThresholds;
private ImmutableList<BundlingThreshold<E>> thresholdPrototypes;
private ImmutableList<ExternalThreshold<E>> externalThresholdPrototypes;
private final Duration maxDelay;

private final Lock lock = new ReentrantLock();
private final Condition bundleCondition = lock.newCondition();
private boolean bundleReady = false;
private BundleHandle currentBundleHandle;

private Stopwatch bundleStopwatch;
private final List<E> data = new ArrayList<>();
private Bundle currentOpenBundle;
private List<Bundle> closedBundles = new ArrayList<>();

private ThresholdBundler(ImmutableList<BundlingThreshold<E>> thresholds,
ImmutableList<ExternalThreshold<E>> externalThresholds,
Duration maxDelay) {
this.thresholds = copyResetThresholds(Preconditions.checkNotNull(thresholds));
this.externalThresholds = copyResetExternalThresholds(
this.thresholdPrototypes = copyResetThresholds(Preconditions.checkNotNull(thresholds));
this.externalThresholdPrototypes = copyResetExternalThresholds(
Preconditions.checkNotNull(externalThresholds));
this.maxDelay = maxDelay;
this.currentBundleHandle = new BundleHandle(externalThresholds);
this.currentOpenBundle = null;
}

/**
Expand Down Expand Up @@ -154,32 +151,37 @@ public ThresholdBundleHandle add(E e) {
final Lock lock = this.lock;
lock.lock();
try {
boolean signal = false;
// TODO verify invariant: bundleStopwatch == null iff size() == 0
if (data.size() == 0) {
bundleStopwatch = Stopwatch.createStarted();
// we want to trigger the signal so that we switch the await from an unbounded
// await to a time-bounded await.
signal = true;
for (ExternalThreshold<E> threshold : externalThresholds) {
threshold.startBundle();
}
validateLimits(e);

boolean signalBundleIsReady = false;
Bundle bundleOfAddedItem = null;
if (currentOpenBundle == null) {
currentOpenBundle = new Bundle(thresholdPrototypes, externalThresholdPrototypes, maxDelay);
currentOpenBundle.start();
signalBundleIsReady = true;
}
data.add(e);
if (!bundleReady) {
for (BundlingThreshold<E> threshold : thresholds) {
threshold.accumulate(e);
if (threshold.isThresholdReached()) {
bundleReady = true;
signal = true;
break;
}

if (currentOpenBundle.canAccept(e)) {
currentOpenBundle.add(e);
bundleOfAddedItem = currentOpenBundle;
if (currentOpenBundle.isAnyThresholdReached()) {
signalBundleIsReady = true;
closedBundles.add(currentOpenBundle);
currentOpenBundle = null;
}
} else {
signalBundleIsReady = true;
closedBundles.add(currentOpenBundle);
currentOpenBundle = new Bundle(thresholdPrototypes, externalThresholdPrototypes, maxDelay);
currentOpenBundle.start();
currentOpenBundle.add(e);
bundleOfAddedItem = currentOpenBundle;
}
if (signal) {

if (signalBundleIsReady) {
bundleCondition.signalAll();
}
return currentBundleHandle;
return bundleOfAddedItem;
} finally {
lock.unlock();
}
Expand All @@ -193,7 +195,10 @@ public void flush() {
final Lock lock = this.lock;
lock.lock();
try {
bundleReady = true;
if (currentOpenBundle != null) {
closedBundles.add(currentOpenBundle);
currentOpenBundle = null;
}
bundleCondition.signalAll();
} finally {
lock.unlock();
Expand All @@ -206,23 +211,24 @@ public void flush() {
*
* @return the number of items added to 'bundle'.
*/
public int drainTo(Collection<? super E> bundle) {
public int drainNextBundleTo(Collection<? super E> outputCollection) {
final Lock lock = this.lock;
lock.lock();
try {
int dataSize = data.size();

bundle.addAll(data);
data.clear();
currentBundleHandle = new BundleHandle(externalThresholds);

thresholds = copyResetThresholds(thresholds);
externalThresholds = copyResetExternalThresholds(externalThresholds);

bundleStopwatch = null;
bundleReady = false;
Bundle outBundle = null;
if (closedBundles.size() > 0) {
outBundle = closedBundles.remove(0);
} else if (currentOpenBundle != null) {
outBundle = currentOpenBundle;
currentOpenBundle = null;
}

return dataSize;
if (outBundle != null) {
outputCollection.addAll(outBundle.getData());
return outputCollection.size();
} else {
return 0;
}
} finally {
lock.unlock();
}
Expand All @@ -236,66 +242,61 @@ public List<E> takeBundle() throws InterruptedException {
lock.lockInterruptibly();
try {
while (shouldWait()) {
if (data.size() == 0 || maxDelay == null) {
if (currentOpenBundle == null || maxDelay == null) {
// if an element gets added, this will be signaled, then we will re-check the while-loop
// condition to see if the delay or other thresholds have been exceeded,
// and if none of these are true, then we will arrive at the time-bounded
// await in the else clause.
bundleCondition.await();
} else {
bundleCondition.await(getDelayLeft().getMillis(), TimeUnit.MILLISECONDS);
bundleCondition.await(currentOpenBundle.getDelayLeft().getMillis(),
TimeUnit.MILLISECONDS);
}
}

List<E> bundle = new ArrayList<>();
drainTo(bundle);
drainNextBundleTo(bundle);
return bundle;
} finally {
lock.unlock();
}
}

/**
* Returns the number of elements queued up in the bundler.
*/
public int size() {
public boolean isEmpty() {
final Lock lock = this.lock;
lock.lock();
try {
return data.size();
if (closedBundles.size() > 0) {
return false;
}
if (currentOpenBundle != null) {
return false;
}
return true;
} finally {
lock.unlock();
}
}

/**
* Returns the elements queued up in the bundler.
*/
public Object[] toArray() {
final Lock lock = this.lock;
lock.lock();
try {
return data.toArray();
} finally {
lock.unlock();
private void validateLimits(E e) {
for (BundlingThreshold<E> threshold : thresholdPrototypes) {
if (!threshold.canAccept(e)) {
throw new IllegalArgumentException("Single item too large for bundle");
}
}
}

private boolean shouldWait() {
if (data.size() == 0) {
return true;
}
if (bundleReady) {
if (closedBundles.size() > 0) {
return false;
}
if (currentOpenBundle == null) {
return true;
}
if (maxDelay == null) {
return true;
}
return getDelayLeft().getMillis() > 0;
}

// pre-condition: data.size() > 0 ( === bundleStopwatch != null)
private Duration getDelayLeft() {
return Duration.millis(maxDelay.getMillis() - bundleStopwatch.elapsed(TimeUnit.MILLISECONDS));
return currentOpenBundle.getDelayLeft().getMillis() > 0;
}

private static <E> ImmutableList<BundlingThreshold<E>> copyResetThresholds(
Expand Down Expand Up @@ -324,11 +325,59 @@ private static <E> ImmutableList<ExternalThreshold<E>> copyResetExternalThreshol
* a ThresholdBundler, but only if the bundle referenced is still the active
* one.
*/
private class BundleHandle implements ThresholdBundleHandle {
private class Bundle implements ThresholdBundleHandle {
private final ImmutableList<BundlingThreshold<E>> thresholds;
private final ImmutableList<ExternalThreshold<E>> externalThresholds;
private final Duration maxDelay;
private final List<E> data = new ArrayList<>();
private Stopwatch stopwatch;

private Bundle(ImmutableList<BundlingThreshold<E>> thresholds,
ImmutableList<ExternalThreshold<E>> externalThresholds,
Duration maxDelay) {
this.thresholds = copyResetThresholds(thresholds);
this.externalThresholds = copyResetExternalThresholds(externalThresholds);
this.maxDelay = maxDelay;
}

private BundleHandle(ImmutableList<ExternalThreshold<E>> externalThresholds) {
this.externalThresholds = externalThresholds;
private void start() {
stopwatch = Stopwatch.createStarted();
for (ExternalThreshold<E> threshold : externalThresholds) {
threshold.startBundle();
}
}

private boolean canAccept(E e) {
for (BundlingThreshold<E> threshold : thresholds) {
if (!threshold.canAccept(e)) {
return false;
}
}
return true;
}

private void add(E e) {
data.add(e);
for (BundlingThreshold<E> threshold : thresholds) {
threshold.accumulate(e);
}
}

private List<E> getData() {
return data;
}

private Duration getDelayLeft() {
return Duration.millis(maxDelay.getMillis() - stopwatch.elapsed(TimeUnit.MILLISECONDS));
}

private boolean isAnyThresholdReached() {
for (BundlingThreshold<E> threshold : thresholds) {
if (threshold.isThresholdReached()) {
return true;
}
}
return false;
}

@Override
Expand All @@ -351,7 +400,7 @@ public void flush() {
lock.lock();

try {
if (ThresholdBundler.this.currentBundleHandle != this) {
if (ThresholdBundler.this.currentOpenBundle != this) {
return;
}
ThresholdBundler.this.flush();
Expand Down
Loading

0 comments on commit a02a695

Please sign in to comment.