From 5f0cdb8c4edcdd909fc4d3694a87cd36da21328d Mon Sep 17 00:00:00 2001 From: Michael Bausor Date: Thu, 2 Mar 2017 21:44:20 -0800 Subject: [PATCH 1/5] Update to accomodate changes to gax --- .../spi/v2/LoggingServiceV2Settings.java | 10 +++- google-cloud-pubsub/.classpath | 26 +++++++++ .../pubsub/spi/v1/MessageDispatcher.java | 2 +- .../spi/v1/PollingSubscriberConnection.java | 2 +- .../google/cloud/pubsub/spi/v1/Publisher.java | 55 ++++++------------- .../pubsub/spi/v1/PublisherSettings.java | 10 +++- .../spi/v1/StreamingSubscriberConnection.java | 2 +- .../cloud/pubsub/spi/v1/Subscriber.java | 6 +- .../pubsub/spi/v1/PublisherImplTest.java | 15 +++-- .../vision/spi/v1/ImageAnnotatorClient.java | 6 +- .../cloud/vision/spi/v1/package-info.java | 6 +- 11 files changed, 80 insertions(+), 60 deletions(-) create mode 100644 google-cloud-pubsub/.classpath diff --git a/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Settings.java b/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Settings.java index adf9ee22181f..bd796788ad6f 100644 --- a/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Settings.java +++ b/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Settings.java @@ -20,11 +20,13 @@ import static com.google.cloud.logging.spi.v2.PagedResponseWrappers.ListMonitoredResourceDescriptorsPagedResponse; import com.google.api.MonitoredResourceDescriptor; +import com.google.api.gax.bundling.BundlingSettings; +import com.google.api.gax.core.FlowControlSettings; +import com.google.api.gax.core.FlowController.LimitExceededBehavior; import com.google.api.gax.core.GoogleCredentialsProvider; import com.google.api.gax.core.RetrySettings; import com.google.api.gax.grpc.BundlingCallSettings; import com.google.api.gax.grpc.BundlingDescriptor; -import com.google.api.gax.grpc.BundlingSettings; import com.google.api.gax.grpc.CallContext; import com.google.api.gax.grpc.ChannelProvider; import com.google.api.gax.grpc.ClientSettings; @@ -558,7 +560,11 @@ private static Builder createDefault() { .getBundlingSettingsBuilder() .setElementCountThreshold(100) .setRequestByteThreshold(1024) - .setDelayThreshold(Duration.millis(10)); + .setDelayThreshold(Duration.millis(10)) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior(LimitExceededBehavior.Ignore) + .build()); builder .writeLogEntriesSettings() .setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("non_idempotent")) diff --git a/google-cloud-pubsub/.classpath b/google-cloud-pubsub/.classpath new file mode 100644 index 000000000000..f619a5369d9d --- /dev/null +++ b/google-cloud-pubsub/.classpath @@ -0,0 +1,26 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java index 9a80cbadb1bd..7736b5d45103 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java @@ -16,7 +16,7 @@ package com.google.cloud.pubsub.spi.v1; -import com.google.api.gax.grpc.FlowController; +import com.google.api.gax.core.FlowController; import com.google.api.stats.Distribution; import com.google.cloud.Clock; import com.google.common.annotations.VisibleForTesting; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java index 3bbf8ed02467..d9a57b7c33b7 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java @@ -18,7 +18,7 @@ import static com.google.cloud.pubsub.spi.v1.StatusUtil.isRetryable; -import com.google.api.gax.grpc.FlowController; +import com.google.api.gax.core.FlowController; import com.google.api.stats.Distribution; import com.google.auth.Credentials; import com.google.cloud.Clock; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java index e72cae3bd7a9..a1424967e901 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java @@ -16,15 +16,15 @@ package com.google.cloud.pubsub.spi.v1; +import com.google.api.gax.bundling.BundlingSettings; +import com.google.api.gax.core.FlowControlSettings; +import com.google.api.gax.core.FlowController; import com.google.api.gax.core.Function; import com.google.api.gax.core.RetrySettings; import com.google.api.gax.core.RpcFuture; import com.google.api.gax.core.RpcFutureCallback; -import com.google.api.gax.grpc.BundlingSettings; import com.google.api.gax.grpc.ChannelProvider; import com.google.api.gax.grpc.ExecutorProvider; -import com.google.api.gax.grpc.FlowControlSettings; -import com.google.api.gax.grpc.FlowController; import com.google.api.gax.grpc.InstantiatingExecutorProvider; import com.google.api.gax.grpc.RpcFutures; import com.google.auth.oauth2.GoogleCredentials; @@ -88,7 +88,6 @@ public class Publisher { private final LongRandom longRandom; private final FlowControlSettings flowControlSettings; - private final boolean failOnFlowControlLimits; private final Lock messagesBundleLock; private List messagesBundle; @@ -125,8 +124,7 @@ private Publisher(Builder builder) throws IOException { this.longRandom = builder.longRandom; flowControlSettings = builder.flowControlSettings; - failOnFlowControlLimits = builder.failOnFlowControlLimits; - this.flowController = new FlowController(flowControlSettings, failOnFlowControlLimits); + this.flowController = new FlowController(flowControlSettings); messagesBundle = new LinkedList<>(); messagesBundleLock = new ReentrantLock(); @@ -173,12 +171,14 @@ public TopicName getTopicName() { * Schedules the publishing of a message. The publishing of the message may occur immediately or * be delayed based on the publisher bundling options. * - *

Depending on chosen flow control {@link #failOnFlowControlLimits option}, the returned - * future might immediately fail with a {@link FlowController.FlowControlException} or block the - * current thread until there are more resources available to publish. + *

Depending on chosen flow control {@link FlowControlSettings#getLimitExceededBehavior + * option}, the returned future might immediately fail with a {@link + * FlowController.FlowControlException} or block the current thread until there are more resources + * available to publish. * *

Example of publishing a message. - *

 {@code
+   *
+   * 
{@code
    * String message = "my_message";
    * ByteString data = ByteString.copyFromUtf8(message);
    * PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
@@ -473,24 +473,17 @@ private long getMaxBundleBytes() {
   }
 
   /**
-   * The bundling settings configured on this {@code Publisher}. See {@link
-   * #failOnFlowControlLimits()}.
-   */
-  public FlowControlSettings getFlowControlSettings() {
-    return flowControlSettings;
-  }
-
-  /**
-   * Whether to block publish calls when reaching flow control limits (see {@link
-   * #getFlowControlSettings()}).
+   * The bundling settings configured on this {@code Publisher}, including whether to block publish
+   * calls when reaching flow control limits.
    *
-   * 

If set to false, a publish call will fail with either {@link - * FlowController.MaxOutstandingRequestBytesReachedException} or {@link + *

If {@link FlowControlSettings#getLimitExceededBehavior()} is set to {@link + * FlowController.LimitExceededBehavior#ThrowException}, a publish call will fail with either + * {@link FlowController.MaxOutstandingRequestBytesReachedException} or {@link * FlowController.MaxOutstandingElementCountReachedException}, as appropriate, when flow control * limits are reached. */ - public boolean failOnFlowControlLimits() { - return failOnFlowControlLimits; + public FlowControlSettings getFlowControlSettings() { + return flowControlSettings; } /** @@ -619,7 +612,6 @@ public long nextLong(long least, long bound) { // Client-side flow control options FlowControlSettings flowControlSettings = FlowControlSettings.getDefaultInstance(); - boolean failOnFlowControlLimits; RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS; LongRandom longRandom = DEFAULT_LONG_RANDOM; @@ -665,19 +657,6 @@ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) { return this; } - /** - * Whether to fail publish when reaching any of the flow control limits, with either a {@link - * FlowController.MaxOutstandingRequestBytesReachedException} or {@link - * FlowController.MaxOutstandingElementCountReachedException} as appropriate. - * - *

If set to false, then publish operations will block the current thread until the - * outstanding requests go under the limits. - */ - public Builder setFailOnFlowControlLimits(boolean fail) { - failOnFlowControlLimits = fail; - return this; - } - /** Configures the Publisher's retry parameters. */ public Builder setRetrySettings(RetrySettings retrySettings) { Preconditions.checkArgument( diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherSettings.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherSettings.java index 535487c4a7f3..e0c6cb644da9 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherSettings.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherSettings.java @@ -18,11 +18,13 @@ import static com.google.cloud.pubsub.spi.v1.PagedResponseWrappers.ListTopicSubscriptionsPagedResponse; import static com.google.cloud.pubsub.spi.v1.PagedResponseWrappers.ListTopicsPagedResponse; +import com.google.api.gax.bundling.BundlingSettings; +import com.google.api.gax.core.FlowControlSettings; +import com.google.api.gax.core.FlowController.LimitExceededBehavior; import com.google.api.gax.core.GoogleCredentialsProvider; import com.google.api.gax.core.RetrySettings; import com.google.api.gax.grpc.BundlingCallSettings; import com.google.api.gax.grpc.BundlingDescriptor; -import com.google.api.gax.grpc.BundlingSettings; import com.google.api.gax.grpc.CallContext; import com.google.api.gax.grpc.ChannelProvider; import com.google.api.gax.grpc.ClientSettings; @@ -539,7 +541,11 @@ private static Builder createDefault() { .getBundlingSettingsBuilder() .setElementCountThreshold(10) .setRequestByteThreshold(1024) - .setDelayThreshold(Duration.millis(10)); + .setDelayThreshold(Duration.millis(10)) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior(LimitExceededBehavior.Ignore) + .build()); builder .publishSettings() .setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("one_plus_delivery")) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java index 6b4b0ef495a0..6cdadd95c728 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java @@ -18,7 +18,7 @@ import static com.google.cloud.pubsub.spi.v1.StatusUtil.isRetryable; -import com.google.api.gax.grpc.FlowController; +import com.google.api.gax.core.FlowController; import com.google.api.stats.Distribution; import com.google.auth.Credentials; import com.google.cloud.Clock; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java index 902226db408f..8133e4829a68 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java @@ -16,9 +16,9 @@ package com.google.cloud.pubsub.spi.v1; +import com.google.api.gax.core.FlowControlSettings; +import com.google.api.gax.core.FlowController; import com.google.api.gax.grpc.ExecutorProvider; -import com.google.api.gax.grpc.FlowControlSettings; -import com.google.api.gax.grpc.FlowController; import com.google.api.gax.grpc.InstantiatingExecutorProvider; import com.google.api.stats.Distribution; import com.google.auth.Credentials; @@ -283,7 +283,7 @@ private SubscriberImpl(Builder builder) throws IOException { Ints.saturatedCast(ackExpirationPadding.getStandardSeconds())); clock = builder.clock.isPresent() ? builder.clock.get() : Clock.defaultClock(); - flowController = new FlowController(builder.flowControlSettings, false); + flowController = new FlowController(builder.flowControlSettings); executor = builder.executorProvider.getExecutor(); if (builder.executorProvider.shouldAutoClose()) { diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java index d078c496f475..51087cccf1e7 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java @@ -18,17 +18,17 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.times; +import com.google.api.gax.bundling.BundlingSettings; +import com.google.api.gax.core.FlowControlSettings; +import com.google.api.gax.core.FlowController.LimitExceededBehavior; import com.google.api.gax.core.RpcFuture; -import com.google.api.gax.grpc.BundlingSettings; import com.google.api.gax.grpc.ChannelProvider; import com.google.api.gax.grpc.ExecutorProvider; import com.google.api.gax.grpc.FixedExecutorProvider; -import com.google.api.gax.grpc.FlowControlSettings; import com.google.api.gax.grpc.InstantiatingExecutorProvider; import com.google.cloud.pubsub.spi.v1.Publisher.Builder; import com.google.protobuf.ByteString; @@ -373,7 +373,6 @@ public void testPublisherGetters() throws Exception { Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC); builder.setChannelProvider(TEST_CHANNEL_PROVIDER); builder.setExecutorProvider(SINGLE_THREAD_EXECUTOR); - builder.setFailOnFlowControlLimits(true); builder.setBundlingSettings( BundlingSettings.newBuilder() .setRequestByteThreshold(10) @@ -384,6 +383,7 @@ public void testPublisherGetters() throws Exception { FlowControlSettings.newBuilder() .setMaxOutstandingRequestBytes(13) .setMaxOutstandingElementCount(14) + .setLimitExceededBehavior(LimitExceededBehavior.ThrowException) .build()); Publisher publisher = builder.build(); @@ -393,7 +393,9 @@ public void testPublisherGetters() throws Exception { assertEquals(12, (long) publisher.getBundlingSettings().getElementCountThreshold()); assertEquals(13, (long) publisher.getFlowControlSettings().getMaxOutstandingRequestBytes()); assertEquals(14, (long) publisher.getFlowControlSettings().getMaxOutstandingElementCount()); - assertTrue(publisher.failOnFlowControlLimits()); + assertEquals( + LimitExceededBehavior.ThrowException, + publisher.getFlowControlSettings().getLimitExceededBehavior()); publisher.shutdown(); } @@ -402,7 +404,8 @@ public void testBuilderParametersAndDefaults() { Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC); assertEquals(TEST_TOPIC, builder.topicName); assertEquals(Publisher.Builder.DEFAULT_EXECUTOR_PROVIDER, builder.executorProvider); - assertFalse(builder.failOnFlowControlLimits); + assertEquals( + LimitExceededBehavior.Block, builder.flowControlSettings.getLimitExceededBehavior()); assertEquals( Publisher.Builder.DEFAULT_REQUEST_BYTES_THRESHOLD, builder.bundlingSettings.getRequestByteThreshold().longValue()); diff --git a/google-cloud-vision/src/main/java/com/google/cloud/vision/spi/v1/ImageAnnotatorClient.java b/google-cloud-vision/src/main/java/com/google/cloud/vision/spi/v1/ImageAnnotatorClient.java index f6fbd37e2785..40643843e0fb 100644 --- a/google-cloud-vision/src/main/java/com/google/cloud/vision/spi/v1/ImageAnnotatorClient.java +++ b/google-cloud-vision/src/main/java/com/google/cloud/vision/spi/v1/ImageAnnotatorClient.java @@ -31,9 +31,9 @@ // AUTO-GENERATED DOCUMENTATION AND SERVICE /** - * Service Description: Service that performs Google Cloud Vision API detection tasks, such as face, - * landmark, logo, label, and text detection, over client images, and returns detected entities from - * the images. + * Service Description: Service that performs Google Cloud Vision API detection tasks over client + * images, such as face, landmark, logo, label, and text detection. The ImageAnnotator service + * returns detected entities from the images. * *

This class provides the ability to make remote calls to the backing service through method * calls that map to API methods. Sample code to get started: diff --git a/google-cloud-vision/src/main/java/com/google/cloud/vision/spi/v1/package-info.java b/google-cloud-vision/src/main/java/com/google/cloud/vision/spi/v1/package-info.java index c866929a9e6f..ea65fee3e3b4 100644 --- a/google-cloud-vision/src/main/java/com/google/cloud/vision/spi/v1/package-info.java +++ b/google-cloud-vision/src/main/java/com/google/cloud/vision/spi/v1/package-info.java @@ -21,9 +21,9 @@ * *

==================== ImageAnnotatorClient ==================== * - *

Service Description: Service that performs Google Cloud Vision API detection tasks, such as - * face, landmark, logo, label, and text detection, over client images, and returns detected - * entities from the images. + *

Service Description: Service that performs Google Cloud Vision API detection tasks over client + * images, such as face, landmark, logo, label, and text detection. The ImageAnnotator service + * returns detected entities from the images. * *

Sample for ImageAnnotatorClient: * From 1fe8f46731f2427aa966e64192bd3f39d762d501 Mon Sep 17 00:00:00 2001 From: Michael Bausor Date: Thu, 2 Mar 2017 21:46:06 -0800 Subject: [PATCH 2/5] Update pom.xml --- google-cloud-pubsub/.classpath | 26 -------------------------- pom.xml | 2 +- 2 files changed, 1 insertion(+), 27 deletions(-) delete mode 100644 google-cloud-pubsub/.classpath diff --git a/google-cloud-pubsub/.classpath b/google-cloud-pubsub/.classpath deleted file mode 100644 index f619a5369d9d..000000000000 --- a/google-cloud-pubsub/.classpath +++ /dev/null @@ -1,26 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/pom.xml b/pom.xml index ee1e97b05549..0c8e55df72ce 100644 --- a/pom.xml +++ b/pom.xml @@ -92,7 +92,7 @@ github 0.6.0 1.0.3 - 0.1.2 + 0.1.3 0.1.5 0.9.4-alpha-SNAPSHOT 0.9.4-beta-SNAPSHOT From f0923c899dd24e81481daed71f00da578a3f4413 Mon Sep 17 00:00:00 2001 From: Michael Bausor Date: Mon, 6 Mar 2017 10:01:28 -0800 Subject: [PATCH 3/5] Regenerate with changes to bundling --- .../spi/v2/LoggingServiceV2Settings.java | 58 +++++++++---------- .../pubsub/spi/v1/PublisherSettings.java | 42 ++++++++------ pom.xml | 2 +- 3 files changed, 52 insertions(+), 50 deletions(-) diff --git a/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Settings.java b/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Settings.java index bd796788ad6f..4bf24aa9c5d2 100644 --- a/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Settings.java +++ b/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Settings.java @@ -21,10 +21,12 @@ import com.google.api.MonitoredResourceDescriptor; import com.google.api.gax.bundling.BundlingSettings; +import com.google.api.gax.bundling.RequestBuilder; import com.google.api.gax.core.FlowControlSettings; import com.google.api.gax.core.FlowController.LimitExceededBehavior; import com.google.api.gax.core.GoogleCredentialsProvider; import com.google.api.gax.core.RetrySettings; +import com.google.api.gax.grpc.BundledRequestIssuer; import com.google.api.gax.grpc.BundlingCallSettings; import com.google.api.gax.grpc.BundlingDescriptor; import com.google.api.gax.grpc.CallContext; @@ -36,7 +38,6 @@ import com.google.api.gax.grpc.PagedCallSettings; import com.google.api.gax.grpc.PagedListDescriptor; import com.google.api.gax.grpc.PagedListResponseFactory; -import com.google.api.gax.grpc.RequestIssuer; import com.google.api.gax.grpc.SimpleCallSettings; import com.google.api.gax.grpc.UnaryCallSettings; import com.google.api.gax.grpc.UnaryCallable; @@ -60,9 +61,7 @@ import com.google.protobuf.ExperimentalApi; import io.grpc.Status; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; -import java.util.List; import javax.annotation.Generated; import org.joda.time.Duration; @@ -400,33 +399,32 @@ public String getBundlePartitionKey(WriteLogEntriesRequest request) { } @Override - public WriteLogEntriesRequest mergeRequests( - Collection requests) { - WriteLogEntriesRequest firstRequest = requests.iterator().next(); - - List elements = new ArrayList<>(); - for (WriteLogEntriesRequest request : requests) { - elements.addAll(request.getEntriesList()); - } - - WriteLogEntriesRequest bundleRequest = - WriteLogEntriesRequest.newBuilder() - .setLogName(firstRequest.getLogName()) - .setResource(firstRequest.getResource()) - .putAllLabels(firstRequest.getLabels()) - .addAllEntries(elements) - .build(); - return bundleRequest; + public RequestBuilder getRequestBuilder() { + return new RequestBuilder() { + private WriteLogEntriesRequest.Builder builder; + + @Override + public void appendRequest(WriteLogEntriesRequest request) { + if (builder == null) { + builder = request.toBuilder(); + } else { + builder.addAllEntries(request.getEntriesList()); + } + } + + @Override + public WriteLogEntriesRequest build() { + return builder.build(); + } + }; } @Override public void splitResponse( WriteLogEntriesResponse bundleResponse, - Collection> - bundle) { + Collection> bundle) { int bundleMessageIndex = 0; - for (RequestIssuer responder : - bundle) { + for (BundledRequestIssuer responder : bundle) { WriteLogEntriesResponse response = WriteLogEntriesResponse.newBuilder().build(); responder.setResponse(response); } @@ -435,10 +433,8 @@ public void splitResponse( @Override public void splitException( Throwable throwable, - Collection> - bundle) { - for (RequestIssuer responder : - bundle) { + Collection> bundle) { + for (BundledRequestIssuer responder : bundle) { responder.setException(throwable); } } @@ -559,11 +555,13 @@ private static Builder createDefault() { .writeLogEntriesSettings() .getBundlingSettingsBuilder() .setElementCountThreshold(100) - .setRequestByteThreshold(1024) + .setRequestByteThreshold(20480) .setDelayThreshold(Duration.millis(10)) .setFlowControlSettings( FlowControlSettings.newBuilder() - .setLimitExceededBehavior(LimitExceededBehavior.Ignore) + .setMaxOutstandingElementCount(10000) + .setMaxOutstandingRequestBytes(2048000) + .setLimitExceededBehavior(LimitExceededBehavior.ThrowException) .build()); builder .writeLogEntriesSettings() diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherSettings.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherSettings.java index e0c6cb644da9..82813efbfef5 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherSettings.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherSettings.java @@ -19,10 +19,12 @@ import static com.google.cloud.pubsub.spi.v1.PagedResponseWrappers.ListTopicsPagedResponse; import com.google.api.gax.bundling.BundlingSettings; +import com.google.api.gax.bundling.RequestBuilder; import com.google.api.gax.core.FlowControlSettings; import com.google.api.gax.core.FlowController.LimitExceededBehavior; import com.google.api.gax.core.GoogleCredentialsProvider; import com.google.api.gax.core.RetrySettings; +import com.google.api.gax.grpc.BundledRequestIssuer; import com.google.api.gax.grpc.BundlingCallSettings; import com.google.api.gax.grpc.BundlingDescriptor; import com.google.api.gax.grpc.CallContext; @@ -34,7 +36,6 @@ import com.google.api.gax.grpc.PagedCallSettings; import com.google.api.gax.grpc.PagedListDescriptor; import com.google.api.gax.grpc.PagedListResponseFactory; -import com.google.api.gax.grpc.RequestIssuer; import com.google.api.gax.grpc.SimpleCallSettings; import com.google.api.gax.grpc.UnaryCallSettings; import com.google.api.gax.grpc.UnaryCallable; @@ -60,7 +61,6 @@ import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PublisherGrpc; -import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.Topic; import io.grpc.Status; import java.io.IOException; @@ -365,30 +365,34 @@ public String getBundlePartitionKey(PublishRequest request) { } @Override - public PublishRequest mergeRequests(Collection requests) { - PublishRequest firstRequest = requests.iterator().next(); + public RequestBuilder getRequestBuilder() { + return new RequestBuilder() { + private PublishRequest.Builder builder; - List elements = new ArrayList<>(); - for (PublishRequest request : requests) { - elements.addAll(request.getMessagesList()); - } + @Override + public void appendRequest(PublishRequest request) { + if (builder == null) { + builder = request.toBuilder(); + } else { + builder.addAllMessages(request.getMessagesList()); + } + } - PublishRequest bundleRequest = - PublishRequest.newBuilder() - .setTopic(firstRequest.getTopic()) - .addAllMessages(elements) - .build(); - return bundleRequest; + @Override + public PublishRequest build() { + return builder.build(); + } + }; } @Override public void splitResponse( PublishResponse bundleResponse, - Collection> bundle) { + Collection> bundle) { int bundleMessageIndex = 0; - for (RequestIssuer responder : bundle) { + for (BundledRequestIssuer responder : bundle) { List subresponseElements = new ArrayList<>(); - int subresponseCount = responder.getRequest().getMessagesCount(); + long subresponseCount = responder.getMessageCount(); for (int i = 0; i < subresponseCount; i++) { subresponseElements.add(bundleResponse.getMessageIds(bundleMessageIndex)); bundleMessageIndex += 1; @@ -402,8 +406,8 @@ public void splitResponse( @Override public void splitException( Throwable throwable, - Collection> bundle) { - for (RequestIssuer responder : bundle) { + Collection> bundle) { + for (BundledRequestIssuer responder : bundle) { responder.setException(throwable); } } diff --git a/pom.xml b/pom.xml index 0c8e55df72ce..055411a08fad 100644 --- a/pom.xml +++ b/pom.xml @@ -92,7 +92,7 @@ github 0.6.0 1.0.3 - 0.1.3 + 0.1.5 0.1.5 0.9.4-alpha-SNAPSHOT 0.9.4-beta-SNAPSHOT From 6b4d9eb77d58c941cc5f83f6cdc0823f183ae4b4 Mon Sep 17 00:00:00 2001 From: Michael Bausor Date: Mon, 6 Mar 2017 10:07:26 -0800 Subject: [PATCH 4/5] Bump GAX to 0.2.0 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 5a38a42da3b7..871da038417e 100644 --- a/pom.xml +++ b/pom.xml @@ -92,7 +92,7 @@ github 0.6.0 1.0.3 - 0.1.5 + 0.2.0 0.1.5 0.9.4-alpha-SNAPSHOT 0.9.4-beta-SNAPSHOT From d9892d580fc2ef31aed39390566811490d1a6346 Mon Sep 17 00:00:00 2001 From: Michael Bausor Date: Tue, 7 Mar 2017 09:14:56 -0800 Subject: [PATCH 5/5] Remove smoke test, update settings, bump GAX version to 0.3.0 --- .../spi/v2/LoggingServiceV2Settings.java | 10 +-- .../pubsub/spi/v1/PublisherSmokeTest.java | 68 ------------------- pom.xml | 2 +- 3 files changed, 6 insertions(+), 74 deletions(-) delete mode 100644 google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherSmokeTest.java diff --git a/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Settings.java b/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Settings.java index 4bf24aa9c5d2..08a31bafc375 100644 --- a/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Settings.java +++ b/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Settings.java @@ -554,13 +554,13 @@ private static Builder createDefault() { builder .writeLogEntriesSettings() .getBundlingSettingsBuilder() - .setElementCountThreshold(100) - .setRequestByteThreshold(20480) - .setDelayThreshold(Duration.millis(10)) + .setElementCountThreshold(1000) + .setRequestByteThreshold(1048576) + .setDelayThreshold(Duration.millis(50)) .setFlowControlSettings( FlowControlSettings.newBuilder() - .setMaxOutstandingElementCount(10000) - .setMaxOutstandingRequestBytes(2048000) + .setMaxOutstandingElementCount(100000) + .setMaxOutstandingRequestBytes(10485760) .setLimitExceededBehavior(LimitExceededBehavior.ThrowException) .build()); builder diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherSmokeTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherSmokeTest.java deleted file mode 100644 index 004d9580cb6d..000000000000 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherSmokeTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2017, Google Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.pubsub.spi.v1; - -import com.google.pubsub.v1.Topic; -import com.google.pubsub.v1.TopicName; -import java.util.logging.Level; -import java.util.logging.Logger; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.DefaultParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.lang.builder.ReflectionToStringBuilder; -import org.apache.commons.lang.builder.ToStringStyle; - -@javax.annotation.Generated("by GAPIC") -public class PublisherSmokeTest { - public static void main(String args[]) { - Logger.getLogger("").setLevel(Level.WARNING); - try { - Options options = new Options(); - options.addOption("h", "help", false, "show usage"); - options.addOption( - Option.builder() - .longOpt("project_id") - .desc("Project id") - .hasArg() - .argName("PROJECT-ID") - .required(true) - .build()); - CommandLine cl = (new DefaultParser()).parse(options, args); - if (cl.hasOption("help")) { - HelpFormatter formater = new HelpFormatter(); - formater.printHelp("PublisherSmokeTest", options); - } - executeNoCatch(cl.getOptionValue("project_id")); - System.out.println("OK"); - } catch (Exception e) { - System.err.println("Failed with exception:"); - e.printStackTrace(System.err); - System.exit(1); - } - } - - public static void executeNoCatch(String projectId) throws Exception { - try (PublisherClient client = PublisherClient.create()) { - TopicName name = TopicName.create(projectId, "smoketesttopic-" + System.currentTimeMillis()); - - Topic response = client.createTopic(name); - System.out.println( - ReflectionToStringBuilder.toString(response, ToStringStyle.MULTI_LINE_STYLE)); - } - } -} diff --git a/pom.xml b/pom.xml index ee844891e030..207164f29494 100644 --- a/pom.xml +++ b/pom.xml @@ -92,7 +92,7 @@ github 0.6.0 1.0.3 - 0.2.0 + 0.3.0 0.1.5 0.9.5-alpha-SNAPSHOT 0.9.5-beta-SNAPSHOT