diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 480a2d202c61..59ca6721ef36 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -446,7 +446,7 @@ class BeamModulePlugin implements Plugin { def errorprone_version = "2.3.4" def google_clients_version = "1.31.0" def google_cloud_bigdataoss_version = "2.2.2" - def google_cloud_pubsublite_version = "1.0.4" + def google_cloud_pubsublite_version = "0.13.2" def google_code_gson_version = "2.8.6" def google_oauth_clients_version = "1.31.0" // Try to keep grpc_version consistent with gRPC version in google_cloud_platform_libraries_bom diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java index bf6a28863b01..b0cc68193184 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java @@ -24,36 +24,25 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; /** Common util functions for converting between PubsubMessage proto and {@link PubsubMessage}. */ -public final class PubsubMessages { - private PubsubMessages() {} - - public static com.google.pubsub.v1.PubsubMessage toProto(PubsubMessage input) { - Map attributes = input.getAttributeMap(); - com.google.pubsub.v1.PubsubMessage.Builder message = - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFrom(input.getPayload())); - // TODO(BEAM-8085) this should not be null - if (attributes != null) { - message.putAllAttributes(attributes); - } - String messageId = input.getMessageId(); - if (messageId != null) { - message.setMessageId(messageId); - } - return message.build(); - } - - public static PubsubMessage fromProto(com.google.pubsub.v1.PubsubMessage input) { - return new PubsubMessage( - input.getData().toByteArray(), input.getAttributesMap(), input.getMessageId()); - } - +public class PubsubMessages { // Convert the PubsubMessage to a PubsubMessage proto, then return its serialized representation. public static class ParsePayloadAsPubsubMessageProto implements SerializableFunction { @Override public byte[] apply(PubsubMessage input) { - return toProto(input).toByteArray(); + Map attributes = input.getAttributeMap(); + com.google.pubsub.v1.PubsubMessage.Builder message = + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(input.getPayload())); + // TODO(BEAM-8085) this should not be null + if (attributes != null) { + message.putAllAttributes(attributes); + } + String messageId = input.getMessageId(); + if (messageId != null) { + message.setMessageId(messageId); + } + return message.build().toByteArray(); } } @@ -65,7 +54,8 @@ public PubsubMessage apply(byte[] input) { try { com.google.pubsub.v1.PubsubMessage message = com.google.pubsub.v1.PubsubMessage.parseFrom(input); - return fromProto(message); + return new PubsubMessage( + message.getData().toByteArray(), message.getAttributesMap(), message.getMessageId()); } catch (InvalidProtocolBufferException e) { throw new RuntimeException("Could not decode Pubsub message", e); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubChecks.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubChecks.java new file mode 100644 index 000000000000..6dc15166666a --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubChecks.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer; + +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.proto.PubSubMessage; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** + * A class providing a conversion validity check between Cloud Pub/Sub and Pub/Sub Lite message + * types. + */ +public final class CloudPubsubChecks { + private CloudPubsubChecks() {} + + /** + * Ensure that all messages that pass through can be converted to Cloud Pub/Sub messages using the + * standard transformation methods in the client library. + * + *

Will fail the pipeline if a message has multiple attributes per key. + */ + public static PTransform, PCollection> + ensureUsableAsCloudPubsub() { + return MapElements.into(TypeDescriptor.of(PubSubMessage.class)) + .via( + message -> { + Object unused = toCpsPublishTransformer().transform(Message.fromProto(message)); + return message; + }); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java deleted file mode 100644 index 1140c11c2767..000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.pubsublite; - -import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.fromCpsPublishTransformer; -import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer; -import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsSubscribeTransformer; - -import com.google.cloud.pubsublite.Message; -import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor; -import com.google.cloud.pubsublite.proto.PubSubMessage; -import com.google.cloud.pubsublite.proto.SequencedMessage; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptor; - -/** A class providing transforms between Cloud Pub/Sub and Pub/Sub Lite message types. */ -public final class CloudPubsubTransforms { - private CloudPubsubTransforms() {} - /** - * Ensure that all messages that pass through can be converted to Cloud Pub/Sub messages using the - * standard transformation methods in the client library. - * - *

Will fail the pipeline if a message has multiple attributes per key. - */ - public static PTransform, PCollection> - ensureUsableAsCloudPubsub() { - return new PTransform, PCollection>() { - @Override - public PCollection expand(PCollection input) { - return input.apply( - MapElements.into(TypeDescriptor.of(PubSubMessage.class)) - .via( - message -> { - Object unused = - toCpsPublishTransformer().transform(Message.fromProto(message)); - return message; - })); - } - }; - } - - /** - * Transform messages read from Pub/Sub Lite to their equivalent Cloud Pub/Sub Message that would - * have been read from PubsubIO. - * - *

Will fail the pipeline if a message has multiple attributes per map key. - */ - public static PTransform, PCollection> - toCloudPubsubMessages() { - return new PTransform, PCollection>() { - @Override - public PCollection expand(PCollection input) { - return input.apply( - MapElements.into(TypeDescriptor.of(PubsubMessage.class)) - .via( - message -> - PubsubMessages.fromProto( - toCpsSubscribeTransformer() - .transform( - com.google.cloud.pubsublite.SequencedMessage.fromProto( - message))))); - } - }; - } - - /** - * Transform messages publishable using PubsubIO to their equivalent Pub/Sub Lite publishable - * message. - */ - public static PTransform, PCollection> - fromCloudPubsubMessages() { - return new PTransform, PCollection>() { - @Override - public PCollection expand(PCollection input) { - return input.apply( - MapElements.into(TypeDescriptor.of(PubSubMessage.class)) - .via( - message -> - fromCpsPublishTransformer(KeyExtractor.DEFAULT) - .transform(PubsubMessages.toProto(message)) - .toProto())); - } - }; - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactory.java deleted file mode 100644 index de0cf433ff33..000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.pubsublite; - -import java.io.Serializable; - -/** - * A ManagedBacklogReaderFactory produces TopicBacklogReaders and tears down any produced readers - * when it is itself closed. - * - *

close() should never be called on produced readers. - */ -public interface ManagedBacklogReaderFactory extends AutoCloseable, Serializable { - TopicBacklogReader newReader(SubscriptionPartition subscriptionPartition); - - @Override - void close(); -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactoryImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactoryImpl.java deleted file mode 100644 index 9a337bfdb784..000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactoryImpl.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.pubsublite; - -import com.google.api.gax.rpc.ApiException; -import com.google.cloud.pubsublite.Offset; -import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse; -import java.util.HashMap; -import java.util.Map; -import javax.annotation.concurrent.GuardedBy; -import org.apache.beam.sdk.transforms.SerializableFunction; - -public class ManagedBacklogReaderFactoryImpl implements ManagedBacklogReaderFactory { - private final SerializableFunction newReader; - - @GuardedBy("this") - private final Map readers = new HashMap<>(); - - ManagedBacklogReaderFactoryImpl( - SerializableFunction newReader) { - this.newReader = newReader; - } - - private static final class NonCloseableTopicBacklogReader implements TopicBacklogReader { - private final TopicBacklogReader underlying; - - NonCloseableTopicBacklogReader(TopicBacklogReader underlying) { - this.underlying = underlying; - } - - @Override - public ComputeMessageStatsResponse computeMessageStats(Offset offset) throws ApiException { - return underlying.computeMessageStats(offset); - } - - @Override - public void close() { - throw new IllegalArgumentException( - "Cannot call close() on a reader returned from ManagedBacklogReaderFactory."); - } - } - - @Override - public synchronized TopicBacklogReader newReader(SubscriptionPartition subscriptionPartition) { - return new NonCloseableTopicBacklogReader( - readers.computeIfAbsent(subscriptionPartition, newReader::apply)); - } - - @Override - public synchronized void close() { - readers.values().forEach(TopicBacklogReader::close); - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRange.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRange.java deleted file mode 100644 index b39d87e6e1f0..000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRange.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.pubsublite; - -import com.google.auto.value.AutoValue; -import org.apache.beam.sdk.coders.DefaultCoder; -import org.apache.beam.sdk.io.range.OffsetRange; - -@AutoValue -@DefaultCoder(OffsetByteRangeCoder.class) -abstract class OffsetByteRange { - abstract OffsetRange getRange(); - - abstract long getByteCount(); - - static OffsetByteRange of(OffsetRange range, long byteCount) { - return new AutoValue_OffsetByteRange(range, byteCount); - } - - static OffsetByteRange of(OffsetRange range) { - return of(range, 0); - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeCoder.java deleted file mode 100644 index 076cda13e193..000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeCoder.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.pubsublite; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderProvider; -import org.apache.beam.sdk.coders.CoderProviders; -import org.apache.beam.sdk.coders.DelegateCoder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.io.range.OffsetRange; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.TypeDescriptor; - -public class OffsetByteRangeCoder extends AtomicCoder { - private static final Coder CODER = - DelegateCoder.of( - KvCoder.of(OffsetRange.Coder.of(), VarLongCoder.of()), - OffsetByteRangeCoder::toKv, - OffsetByteRangeCoder::fromKv); - - private static KV toKv(OffsetByteRange value) { - return KV.of(value.getRange(), value.getByteCount()); - } - - private static OffsetByteRange fromKv(KV kv) { - return OffsetByteRange.of(kv.getKey(), kv.getValue()); - } - - @Override - public void encode(OffsetByteRange value, OutputStream outStream) throws IOException { - CODER.encode(value, outStream); - } - - @Override - public OffsetByteRange decode(InputStream inStream) throws IOException { - return CODER.decode(inStream); - } - - public static CoderProvider getCoderProvider() { - return CoderProviders.forCoder( - TypeDescriptor.of(OffsetByteRange.class), new OffsetByteRangeCoder()); - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java index da9aaaa03ac3..608af8fea189 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java @@ -26,6 +26,8 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch; import org.joda.time.Duration; @@ -42,33 +44,35 @@ * received. IMPORTANT: minTrackingTime must be strictly smaller than the SDF read timeout when it * would return ProcessContinuation.resume(). */ -class OffsetByteRangeTracker extends TrackerWithProgress { - private final TopicBacklogReader unownedBacklogReader; +class OffsetByteRangeTracker extends RestrictionTracker + implements HasProgress { + private final TopicBacklogReader backlogReader; private final Duration minTrackingTime; private final long minBytesReceived; private final Stopwatch stopwatch; - private OffsetByteRange range; + private OffsetRange range; private @Nullable Long lastClaimed; + private long byteCount = 0; public OffsetByteRangeTracker( - OffsetByteRange range, - TopicBacklogReader unownedBacklogReader, + OffsetRange range, + TopicBacklogReader backlogReader, Stopwatch stopwatch, Duration minTrackingTime, long minBytesReceived) { - checkArgument( - range.getRange().getTo() == Long.MAX_VALUE, - "May only construct OffsetByteRangeTracker with an unbounded range with no progress."); - checkArgument( - range.getByteCount() == 0L, - "May only construct OffsetByteRangeTracker with an unbounded range with no progress."); - this.unownedBacklogReader = unownedBacklogReader; + checkArgument(range.getTo() == Long.MAX_VALUE); + this.backlogReader = backlogReader; this.minTrackingTime = minTrackingTime; this.minBytesReceived = minBytesReceived; this.stopwatch = stopwatch.reset().start(); this.range = range; } + @Override + public void finalize() { + this.backlogReader.close(); + } + @Override public IsBounded isBounded() { return IsBounded.UNBOUNDED; @@ -83,32 +87,32 @@ public boolean tryClaim(OffsetByteProgress position) { position.lastOffset().value(), lastClaimed); checkArgument( - toClaim >= range.getRange().getFrom(), + toClaim >= range.getFrom(), "Trying to claim offset %s before start of the range %s", toClaim, range); // split() has already been called, truncating this range. No more offsets may be claimed. - if (range.getRange().getTo() != Long.MAX_VALUE) { - boolean isRangeEmpty = range.getRange().getTo() == range.getRange().getFrom(); - boolean isValidClosedRange = nextOffset() == range.getRange().getTo(); + if (range.getTo() != Long.MAX_VALUE) { + boolean isRangeEmpty = range.getTo() == range.getFrom(); + boolean isValidClosedRange = nextOffset() == range.getTo(); checkState( isRangeEmpty || isValidClosedRange, "Violated class precondition: offset range improperly split. Please report a beam bug."); return false; } lastClaimed = toClaim; - range = OffsetByteRange.of(range.getRange(), range.getByteCount() + position.batchBytes()); + byteCount += position.batchBytes(); return true; } @Override - public OffsetByteRange currentRestriction() { + public OffsetRange currentRestriction() { return range; } private long nextOffset() { checkState(lastClaimed == null || lastClaimed < Long.MAX_VALUE); - return lastClaimed == null ? currentRestriction().getRange().getFrom() : lastClaimed + 1; + return lastClaimed == null ? currentRestriction().getFrom() : lastClaimed + 1; } /** @@ -120,33 +124,29 @@ private boolean receivedEnough() { if (duration.isLongerThan(minTrackingTime)) { return true; } - if (currentRestriction().getByteCount() >= minBytesReceived) { + if (byteCount >= minBytesReceived) { return true; } return false; } @Override - public @Nullable SplitResult trySplit(double fractionOfRemainder) { + public @Nullable SplitResult trySplit(double fractionOfRemainder) { // Cannot split a bounded range. This should already be completely claimed. - if (range.getRange().getTo() != Long.MAX_VALUE) { + if (range.getTo() != Long.MAX_VALUE) { return null; } if (!receivedEnough()) { return null; } - range = - OffsetByteRange.of( - new OffsetRange(currentRestriction().getRange().getFrom(), nextOffset()), - range.getByteCount()); - return SplitResult.of( - this.range, OffsetByteRange.of(new OffsetRange(nextOffset(), Long.MAX_VALUE), 0)); + range = new OffsetRange(currentRestriction().getFrom(), nextOffset()); + return SplitResult.of(this.range, new OffsetRange(nextOffset(), Long.MAX_VALUE)); } @Override @SuppressWarnings("unboxing.of.nullable") public void checkDone() throws IllegalStateException { - if (range.getRange().getFrom() == range.getRange().getTo()) { + if (range.getFrom() == range.getTo()) { return; } checkState( @@ -155,18 +155,18 @@ public void checkDone() throws IllegalStateException { range); long lastClaimedNotNull = checkNotNull(lastClaimed); checkState( - lastClaimedNotNull >= range.getRange().getTo() - 1, + lastClaimedNotNull >= range.getTo() - 1, "Last attempted offset was %s in range %s, claiming work in [%s, %s) was not attempted", lastClaimedNotNull, range, lastClaimedNotNull + 1, - range.getRange().getTo()); + range.getTo()); } @Override public Progress getProgress() { ComputeMessageStatsResponse stats = - this.unownedBacklogReader.computeMessageStats(Offset.of(nextOffset())); - return Progress.from(range.getByteCount(), stats.getMessageBytes()); + this.backlogReader.computeMessageStats(Offset.of(nextOffset())); + return Progress.from(byteCount, stats.getMessageBytes()); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java index d7526d88e089..623e20c09b45 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java @@ -27,8 +27,4 @@ final class PerServerPublisherCache { private PerServerPublisherCache() {} static final PublisherCache PUBLISHER_CACHE = new PublisherCache(); - - static { - Runtime.getRuntime().addShutdownHook(new Thread(PUBLISHER_CACHE::close)); - } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java index fdf792029863..a9f7a439f0da 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java @@ -17,12 +17,13 @@ */ package org.apache.beam.sdk.io.gcp.pubsublite; -import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown; +import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; import com.google.cloud.pubsublite.Offset; -import com.google.cloud.pubsublite.internal.ExtractStatus; import com.google.cloud.pubsublite.internal.wire.Committer; import com.google.cloud.pubsublite.proto.SequencedMessage; +import java.util.concurrent.ExecutionException; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SerializableBiFunction; @@ -34,35 +35,31 @@ class PerSubscriptionPartitionSdf extends DoFn { private final Duration maxSleepTime; - private final ManagedBacklogReaderFactory backlogReaderFactory; private final SubscriptionPartitionProcessorFactory processorFactory; private final SerializableFunction offsetReaderFactory; - private final SerializableBiFunction + private final SerializableBiFunction< + SubscriptionPartition, OffsetRange, RestrictionTracker> trackerFactory; private final SerializableFunction committerFactory; PerSubscriptionPartitionSdf( Duration maxSleepTime, - ManagedBacklogReaderFactory backlogReaderFactory, SerializableFunction offsetReaderFactory, - SerializableBiFunction + SerializableBiFunction< + SubscriptionPartition, + OffsetRange, + RestrictionTracker> trackerFactory, SubscriptionPartitionProcessorFactory processorFactory, SerializableFunction committerFactory) { this.maxSleepTime = maxSleepTime; - this.backlogReaderFactory = backlogReaderFactory; this.processorFactory = processorFactory; this.offsetReaderFactory = offsetReaderFactory; this.trackerFactory = trackerFactory; this.committerFactory = committerFactory; } - @Teardown - public void teardown() { - backlogReaderFactory.close(); - } - @GetInitialWatermarkEstimatorState public Instant getInitialWatermarkState() { return Instant.EPOCH; @@ -75,7 +72,7 @@ public MonotonicallyIncreasing newWatermarkEstimator(@WatermarkEstimatorState In @ProcessElement public ProcessContinuation processElement( - RestrictionTracker tracker, + RestrictionTracker tracker, @Element SubscriptionPartition subscriptionPartition, OutputReceiver receiver) throws Exception { @@ -86,44 +83,38 @@ public ProcessContinuation processElement( processor .lastClaimed() .ifPresent( - lastClaimedOffset -> { + lastClaimedOffset -> + /* TODO(boyuanzz): When default dataflow can use finalizers, undo this. + finalizer.afterBundleCommit( + Instant.ofEpochMilli(Long.MAX_VALUE), + () -> */ { Committer committer = committerFactory.apply(subscriptionPartition); committer.startAsync().awaitRunning(); // Commit the next-to-deliver offset. try { committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get(); + } catch (ExecutionException e) { + throw toCanonical(checkArgumentNotNull(e.getCause())).underlying; } catch (Exception e) { - throw ExtractStatus.toCanonical(e).underlying; + throw toCanonical(e).underlying; } - blockingShutdown(committer); + committer.stopAsync().awaitTerminated(); }); return result; } } @GetInitialRestriction - public OffsetByteRange getInitialRestriction( - @Element SubscriptionPartition subscriptionPartition) { + public OffsetRange getInitialRestriction(@Element SubscriptionPartition subscriptionPartition) { try (InitialOffsetReader reader = offsetReaderFactory.apply(subscriptionPartition)) { Offset offset = reader.read(); - return OffsetByteRange.of( - new OffsetRange(offset.value(), Long.MAX_VALUE /* open interval */)); + return new OffsetRange(offset.value(), Long.MAX_VALUE /* open interval */); } } @NewTracker - public TrackerWithProgress newTracker( - @Element SubscriptionPartition subscriptionPartition, @Restriction OffsetByteRange range) { - return trackerFactory.apply(backlogReaderFactory.newReader(subscriptionPartition), range); - } - - @GetSize - public double getSize( - @Element SubscriptionPartition subscriptionPartition, - @Restriction OffsetByteRange restriction) { - if (restriction.getRange().getTo() != Long.MAX_VALUE) { - return restriction.getByteCount(); - } - return newTracker(subscriptionPartition, restriction).getProgress().getWorkRemaining(); + public RestrictionTracker newTracker( + @Element SubscriptionPartition subscriptionPartition, @Restriction OffsetRange range) { + return trackerFactory.apply(subscriptionPartition, range); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java index 3dbdec69db99..f8dc24baa7d6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java @@ -23,50 +23,52 @@ import com.google.api.core.ApiService.State; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.MessageMetadata; +import com.google.cloud.pubsublite.internal.CloseableMonitor; import com.google.cloud.pubsublite.internal.Publisher; -import com.google.cloud.pubsublite.internal.wire.SystemExecutors; import com.google.errorprone.annotations.concurrent.GuardedBy; import java.util.HashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; /** A map of working publishers by PublisherOptions. */ -class PublisherCache implements AutoCloseable { - @GuardedBy("this") +class PublisherCache { + private final CloseableMonitor monitor = new CloseableMonitor(); + + private final Executor listenerExecutor = Executors.newSingleThreadExecutor(); + + @GuardedBy("monitor.monitor") private final HashMap> livePublishers = new HashMap<>(); - private synchronized void evict(PublisherOptions options) { - livePublishers.remove(options); - } - - synchronized Publisher get(PublisherOptions options) throws ApiException { + Publisher get(PublisherOptions options) throws ApiException { checkArgument(options.usesCache()); - Publisher publisher = livePublishers.get(options); - if (publisher != null) { + try (CloseableMonitor.Hold h = monitor.enter()) { + Publisher publisher = livePublishers.get(options); + if (publisher != null) { + return publisher; + } + publisher = Publishers.newPublisher(options); + livePublishers.put(options, publisher); + publisher.addListener( + new Listener() { + @Override + public void failed(State s, Throwable t) { + try (CloseableMonitor.Hold h = monitor.enter()) { + livePublishers.remove(options); + } + } + }, + listenerExecutor); + publisher.startAsync().awaitRunning(); return publisher; } - publisher = Publishers.newPublisher(options); - livePublishers.put(options, publisher); - publisher.addListener( - new Listener() { - @Override - public void failed(State s, Throwable t) { - evict(options); - } - }, - SystemExecutors.getFuturesExecutor()); - publisher.startAsync().awaitRunning(); - return publisher; } @VisibleForTesting - synchronized void set(PublisherOptions options, Publisher toCache) { - livePublishers.put(options, toCache); - } - - @Override - public synchronized void close() { - livePublishers.forEach(((options, publisher) -> publisher.stopAsync())); - livePublishers.clear(); + void set(PublisherOptions options, Publisher toCache) { + try (CloseableMonitor.Hold h = monitor.enter()) { + livePublishers.put(options, toCache); + } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java index 67ea6cf6062d..34012f72db15 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java @@ -17,27 +17,17 @@ */ package org.apache.beam.sdk.io.gcp.pubsublite; -import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; import static com.google.cloud.pubsublite.internal.UncheckedApiPreconditions.checkArgument; -import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata; -import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.AdminClientSettings; import com.google.cloud.pubsublite.MessageMetadata; -import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings; +import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.Publisher; import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings; -import com.google.cloud.pubsublite.internal.wire.PubsubContext; import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework; -import com.google.cloud.pubsublite.internal.wire.RoutingMetadata; import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder; -import com.google.cloud.pubsublite.v1.AdminServiceClient; -import com.google.cloud.pubsublite.v1.AdminServiceSettings; -import com.google.cloud.pubsublite.v1.PublisherServiceClient; -import com.google.cloud.pubsublite.v1.PublisherServiceSettings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken; class Publishers { @@ -45,38 +35,6 @@ class Publishers { private Publishers() {} - private static AdminClient newAdminClient(PublisherOptions options) throws ApiException { - try { - return AdminClient.create( - AdminClientSettings.newBuilder() - .setServiceClient( - AdminServiceClient.create( - addDefaultSettings( - options.topicPath().location().extractRegion(), - AdminServiceSettings.newBuilder()))) - .setRegion(options.topicPath().location().extractRegion()) - .build()); - } catch (Throwable t) { - throw toCanonical(t).underlying; - } - } - - private static PublisherServiceClient newServiceClient( - PublisherOptions options, Partition partition) { - PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder(); - settingsBuilder = - addDefaultMetadata( - PubsubContext.of(FRAMEWORK), - RoutingMetadata.of(options.topicPath(), partition), - settingsBuilder); - try { - return PublisherServiceClient.create( - addDefaultSettings(options.topicPath().location().extractRegion(), settingsBuilder)); - } catch (Throwable t) { - throw toCanonical(t).underlying; - } - } - @SuppressWarnings("unchecked") static Publisher newPublisher(PublisherOptions options) throws ApiException { SerializableSupplier supplier = options.publisherSupplier(); @@ -86,18 +44,20 @@ static Publisher newPublisher(PublisherOptions options) throws checkArgument(token.isSupertypeOf(supplied.getClass())); return (Publisher) supplied; } - return PartitionCountWatchingPublisherSettings.newBuilder() - .setTopic(options.topicPath()) - .setPublisherFactory( - partition -> - SinglePartitionPublisherBuilder.newBuilder() - .setTopic(options.topicPath()) - .setPartition(partition) - .setServiceClient(newServiceClient(options, partition)) - .setBatchingSettings(PublisherSettings.DEFAULT_BATCHING_SETTINGS) - .build()) - .setAdminClient(newAdminClient(options)) - .build() - .instantiate(); + + TopicPath topic = options.topicPath(); + PartitionCountWatchingPublisherSettings.Builder publisherSettings = + PartitionCountWatchingPublisherSettings.newBuilder() + .setTopic(topic) + .setPublisherFactory( + partition -> + SinglePartitionPublisherBuilder.newBuilder() + .setTopic(topic) + .setPartition(partition) + .build()) + .setAdminClient( + AdminClient.create( + AdminClientSettings.newBuilder().setRegion(topic.location().region()).build())); + return publisherSettings.build().instantiate(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java index b93ac61f33be..ca1f2be41699 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java @@ -107,7 +107,7 @@ public static PTransform, PCollection> * } */ public static PTransform, PDone> write(PublisherOptions options) { - return new PTransform, PDone>() { + return new PTransform, PDone>("PubsubLiteIO") { @Override public PDone expand(PCollection input) { PubsubLiteSink sink = new PubsubLiteSink(options); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java index d0e3afa2ac07..d3acdfa35e05 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java @@ -28,14 +28,16 @@ import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.cloud.pubsublite.internal.ExtractStatus; import com.google.cloud.pubsublite.internal.Publisher; -import com.google.cloud.pubsublite.internal.wire.SystemExecutors; import com.google.cloud.pubsublite.proto.PubSubMessage; import com.google.errorprone.annotations.concurrent.GuardedBy; import java.util.ArrayDeque; import java.util.Deque; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.function.Consumer; import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOrError.Kind; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors; /** A sink which publishes messages to Pub/Sub Lite. */ @SuppressWarnings({ @@ -54,6 +56,8 @@ class PubsubLiteSink extends DoFn { @GuardedBy("this") private transient Deque errorsSinceLastFinish; + private static final Executor executor = Executors.newCachedThreadPool(); + PubsubLiteSink(PublisherOptions options) { this.options = options; } @@ -85,7 +89,7 @@ public void failed(State s, Throwable t) { onFailure.accept(t); } }, - SystemExecutors.getFuturesExecutor()); + MoreExecutors.directExecutor()); if (!options.usesCache()) { publisher.startAsync(); } @@ -126,7 +130,7 @@ public void onFailure(Throwable t) { onFailure.accept(t); } }, - SystemExecutors.getFuturesExecutor()); + executor); } // Intentionally don't flush on bundle finish to allow multi-sink client reuse. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java index b6a9f5d59090..9875880584ca 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java @@ -23,7 +23,6 @@ import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.AdminClientSettings; -import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.wire.Committer; @@ -32,6 +31,8 @@ import java.util.List; import java.util.function.Consumer; import java.util.stream.Collectors; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -53,11 +54,10 @@ private void checkSubscription(SubscriptionPartition subscriptionPartition) thro checkArgument(subscriptionPartition.subscription().equals(options.subscriptionPath())); } - private Subscriber newSubscriber( - Partition partition, Offset initialOffset, Consumer> consumer) { + private Subscriber newSubscriber(Partition partition, Consumer> consumer) { try { return options - .getSubscriberFactory(partition, initialOffset) + .getSubscriberFactory(partition) .newSubscriber( messages -> consumer.accept( @@ -71,31 +71,23 @@ private Subscriber newSubscriber( private SubscriptionPartitionProcessor newPartitionProcessor( SubscriptionPartition subscriptionPartition, - RestrictionTracker tracker, + RestrictionTracker tracker, OutputReceiver receiver) throws ApiException { checkSubscription(subscriptionPartition); return new SubscriptionPartitionProcessorImpl( tracker, receiver, - consumer -> - newSubscriber( - subscriptionPartition.partition(), - Offset.of(tracker.currentRestriction().getRange().getFrom()), - consumer), + consumer -> newSubscriber(subscriptionPartition.partition(), consumer), options.flowControlSettings()); } - private TopicBacklogReader newBacklogReader(SubscriptionPartition subscriptionPartition) { + private RestrictionTracker newRestrictionTracker( + SubscriptionPartition subscriptionPartition, OffsetRange initial) { checkSubscription(subscriptionPartition); - return options.getBacklogReader(subscriptionPartition.partition()); - } - - private TrackerWithProgress newRestrictionTracker( - TopicBacklogReader backlogReader, OffsetByteRange initial) { return new OffsetByteRangeTracker( initial, - backlogReader, + options.getBacklogReader(subscriptionPartition.partition()), Stopwatch.createUnstarted(), options.minBundleTimeout(), LongMath.saturatedMultiply(options.flowControlSettings().bytesOutstanding(), 10)); @@ -115,7 +107,7 @@ private TopicPath getTopicPath() { try (AdminClient admin = AdminClient.create( AdminClientSettings.newBuilder() - .setRegion(options.subscriptionPath().location().extractRegion()) + .setRegion(options.subscriptionPath().location().region()) .build())) { return TopicPath.parse(admin.getSubscription(options.subscriptionPath()).get().getTopic()); } catch (Throwable t) { @@ -126,15 +118,25 @@ private TopicPath getTopicPath() { @Override public PCollection expand(PBegin input) { PCollection subscriptionPartitions; - subscriptionPartitions = - input.apply(new SubscriptionPartitionLoader(getTopicPath(), options.subscriptionPath())); + if (options.partitions().isEmpty()) { + subscriptionPartitions = + input.apply(new SubscriptionPartitionLoader(getTopicPath(), options.subscriptionPath())); + } else { + subscriptionPartitions = + input.apply( + Create.of( + options.partitions().stream() + .map( + partition -> + SubscriptionPartition.of(options.subscriptionPath(), partition)) + .collect(Collectors.toList()))); + } return subscriptionPartitions.apply( ParDo.of( new PerSubscriptionPartitionSdf( // Ensure we read for at least 5 seconds more than the bundle timeout. options.minBundleTimeout().plus(Duration.standardSeconds(5)), - new ManagedBacklogReaderFactoryImpl(this::newBacklogReader), this::newInitialOffsetReader, this::newRestrictionTracker, this::newPartitionProcessor, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java index a9625be608fd..0d3afe2c60da 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java @@ -23,7 +23,6 @@ import com.google.api.gax.rpc.ApiException; import com.google.auto.value.AutoValue; -import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; @@ -36,13 +35,13 @@ import com.google.cloud.pubsublite.internal.wire.RoutingMetadata; import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder; import com.google.cloud.pubsublite.internal.wire.SubscriberFactory; -import com.google.cloud.pubsublite.proto.Cursor; -import com.google.cloud.pubsublite.proto.SeekRequest; import com.google.cloud.pubsublite.v1.CursorServiceClient; import com.google.cloud.pubsublite.v1.CursorServiceSettings; import com.google.cloud.pubsublite.v1.SubscriberServiceClient; import com.google.cloud.pubsublite.v1.SubscriberServiceSettings; import java.io.Serializable; +import java.util.Set; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -69,6 +68,11 @@ public abstract class SubscriberOptions implements Serializable { /** Per-partition flow control parameters for this subscription. */ public abstract FlowControlSettings flowControlSettings(); + /** + * A set of partitions. If empty, continuously poll the set of partitions using an admin client. + */ + public abstract Set partitions(); + /** * The minimum wall time to pass before allowing bundle closure. * @@ -104,6 +108,7 @@ public abstract class SubscriberOptions implements Serializable { public static Builder newBuilder() { Builder builder = new AutoValue_SubscriberOptions.Builder(); return builder + .setPartitions(ImmutableSet.of()) .setFlowControlSettings(DEFAULT_FLOW_CONTROL) .setMinBundleTimeout(MIN_BUNDLE_TIMEOUT); } @@ -114,19 +119,20 @@ private SubscriberServiceClient newSubscriberServiceClient(Partition partition) throws ApiException { try { SubscriberServiceSettings.Builder settingsBuilder = SubscriberServiceSettings.newBuilder(); + settingsBuilder = addDefaultMetadata( PubsubContext.of(FRAMEWORK), RoutingMetadata.of(subscriptionPath(), partition), settingsBuilder); return SubscriberServiceClient.create( - addDefaultSettings(subscriptionPath().location().extractRegion(), settingsBuilder)); + addDefaultSettings(subscriptionPath().location().region(), settingsBuilder)); } catch (Throwable t) { throw toCanonical(t).underlying; } } - SubscriberFactory getSubscriberFactory(Partition partition, Offset initialOffset) { + SubscriberFactory getSubscriberFactory(Partition partition) { SubscriberFactory factory = subscriberFactory(); if (factory != null) { return factory; @@ -137,10 +143,6 @@ SubscriberFactory getSubscriberFactory(Partition partition, Offset initialOffset .setSubscriptionPath(subscriptionPath()) .setPartition(partition) .setServiceClient(newSubscriberServiceClient(partition)) - .setInitialLocation( - SeekRequest.newBuilder() - .setCursor(Cursor.newBuilder().setOffset(initialOffset.value())) - .build()) .build(); } @@ -148,7 +150,7 @@ private CursorServiceClient newCursorServiceClient() throws ApiException { try { return CursorServiceClient.create( addDefaultSettings( - subscriptionPath().location().extractRegion(), CursorServiceSettings.newBuilder())); + subscriptionPath().location().region(), CursorServiceSettings.newBuilder())); } catch (Throwable t) { throw toCanonical(t).underlying; } @@ -187,7 +189,7 @@ InitialOffsetReader getInitialOffsetReader(Partition partition) { return new InitialOffsetReaderImpl( CursorClient.create( CursorClientSettings.newBuilder() - .setRegion(subscriptionPath().location().extractRegion()) + .setRegion(subscriptionPath().location().region()) .build()), subscriptionPath(), partition); @@ -199,6 +201,8 @@ public abstract static class Builder { public abstract Builder setSubscriptionPath(SubscriptionPath path); // Optional parameters + public abstract Builder setPartitions(Set partitions); + public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings); public abstract Builder setMinBundleTimeout(Duration minBundleTimeout); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java index 530c180ebd88..6bf362380ffe 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java @@ -19,6 +19,7 @@ import com.google.cloud.pubsublite.proto.SequencedMessage; import java.io.Serializable; +import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; @@ -27,6 +28,6 @@ interface SubscriptionPartitionProcessorFactory extends Serializable { SubscriptionPartitionProcessor newProcessor( SubscriptionPartition subscriptionPartition, - RestrictionTracker tracker, + RestrictionTracker tracker, OutputReceiver receiver); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java index a086d18b2f65..8d2a137a27dc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk.io.gcp.pubsublite; -import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown; - import com.google.api.core.ApiService.Listener; import com.google.api.core.ApiService.State; import com.google.cloud.pubsublite.Offset; @@ -26,8 +24,9 @@ import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.cloud.pubsublite.internal.ExtractStatus; import com.google.cloud.pubsublite.internal.wire.Subscriber; -import com.google.cloud.pubsublite.internal.wire.SystemExecutors; +import com.google.cloud.pubsublite.proto.Cursor; import com.google.cloud.pubsublite.proto.FlowControlRequest; +import com.google.cloud.pubsublite.proto.SeekRequest; import com.google.cloud.pubsublite.proto.SequencedMessage; import com.google.protobuf.util.Timestamps; import java.util.List; @@ -37,17 +36,19 @@ import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.function.Function; +import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture; import org.joda.time.Duration; import org.joda.time.Instant; class SubscriptionPartitionProcessorImpl extends Listener implements SubscriptionPartitionProcessor { - private final RestrictionTracker tracker; + private final RestrictionTracker tracker; private final OutputReceiver receiver; private final Subscriber subscriber; private final SettableFuture completionFuture = SettableFuture.create(); @@ -56,7 +57,7 @@ class SubscriptionPartitionProcessorImpl extends Listener @SuppressWarnings("methodref.receiver.bound.invalid") SubscriptionPartitionProcessorImpl( - RestrictionTracker tracker, + RestrictionTracker tracker, OutputReceiver receiver, Function>, Subscriber> subscriberFactory, FlowControlSettings flowControlSettings) { @@ -69,15 +70,23 @@ class SubscriptionPartitionProcessorImpl extends Listener @Override @SuppressWarnings("argument.type.incompatible") public void start() throws CheckedApiException { - this.subscriber.addListener(this, SystemExecutors.getFuturesExecutor()); + this.subscriber.addListener(this, MoreExecutors.directExecutor()); this.subscriber.startAsync(); this.subscriber.awaitRunning(); try { + this.subscriber + .seek( + SeekRequest.newBuilder() + .setCursor(Cursor.newBuilder().setOffset(tracker.currentRestriction().getFrom())) + .build()) + .get(); this.subscriber.allowFlow( FlowControlRequest.newBuilder() .setAllowedBytes(flowControlSettings.bytesOutstanding()) .setAllowedMessages(flowControlSettings.messagesOutstanding()) .build()); + } catch (ExecutionException e) { + throw ExtractStatus.toCanonical(e.getCause()); } catch (Throwable t) { throw ExtractStatus.toCanonical(t); } @@ -116,7 +125,7 @@ public void failed(State from, Throwable failure) { @Override public void close() { - blockingShutdown(subscriber); + subscriber.stopAsync().awaitTerminated(); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java index 79db0f19f5dd..8c1dd9439e51 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java @@ -62,7 +62,7 @@ Builder setTopicPathFromSubscriptionPath(SubscriptionPath subscriptionPath) try (AdminClient adminClient = AdminClient.create( AdminClientSettings.newBuilder() - .setRegion(subscriptionPath.location().extractRegion()) + .setRegion(subscriptionPath.location().region()) .build())) { return setTopicPath( TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic())); @@ -81,9 +81,7 @@ Builder setTopicPathFromSubscriptionPath(SubscriptionPath subscriptionPath) TopicBacklogReader instantiate() throws ApiException { TopicStatsClientSettings settings = - TopicStatsClientSettings.newBuilder() - .setRegion(topicPath().location().extractRegion()) - .build(); + TopicStatsClientSettings.newBuilder().setRegion(topicPath().location().region()).build(); TopicBacklogReader impl = new TopicBacklogReaderImpl(TopicStatsClient.create(settings), topicPath(), partition()); return new LimitingTopicBacklogReader(impl, Ticker.systemTicker()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TrackerWithProgress.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TrackerWithProgress.java deleted file mode 100644 index 7f0d0309a597..000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TrackerWithProgress.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.pubsublite; - -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; - -public abstract class TrackerWithProgress - extends RestrictionTracker implements HasProgress {} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java index 5a31f4fc686d..f34ebb6a745e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java @@ -49,7 +49,7 @@ public class OffsetByteRangeTrackerTest { private static final double IGNORED_FRACTION = -10000000.0; private static final long MIN_BYTES = 1000; private static final OffsetRange RANGE = new OffsetRange(123L, Long.MAX_VALUE); - private final TopicBacklogReader unownedBacklogReader = mock(TopicBacklogReader.class); + private final TopicBacklogReader reader = mock(TopicBacklogReader.class); @Spy Ticker ticker; private OffsetByteRangeTracker tracker; @@ -60,18 +60,14 @@ public void setUp() { when(ticker.read()).thenReturn(0L); tracker = new OffsetByteRangeTracker( - OffsetByteRange.of(RANGE, 0), - unownedBacklogReader, - Stopwatch.createUnstarted(ticker), - Duration.millis(500), - MIN_BYTES); + RANGE, reader, Stopwatch.createUnstarted(ticker), Duration.millis(500), MIN_BYTES); } @Test public void progressTracked() { assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(123), 10))); assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(124), 11))); - when(unownedBacklogReader.computeMessageStats(Offset.of(125))) + when(reader.computeMessageStats(Offset.of(125))) .thenReturn(ComputeMessageStatsResponse.newBuilder().setMessageBytes(1000).build()); Progress progress = tracker.getProgress(); assertEquals(21, progress.getWorkCompleted(), .0001); @@ -80,7 +76,7 @@ public void progressTracked() { @Test public void getProgressStatsFailure() { - when(unownedBacklogReader.computeMessageStats(Offset.of(123))) + when(reader.computeMessageStats(Offset.of(123))) .thenThrow(new CheckedApiException(Code.INTERNAL).underlying); assertThrows(ApiException.class, tracker::getProgress); } @@ -90,15 +86,11 @@ public void getProgressStatsFailure() { public void claimSplitSuccess() { assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_000), MIN_BYTES))); assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(10_000), MIN_BYTES))); - SplitResult splits = tracker.trySplit(IGNORED_FRACTION); - OffsetByteRange primary = splits.getPrimary(); - assertEquals(RANGE.getFrom(), primary.getRange().getFrom()); - assertEquals(10_001, primary.getRange().getTo()); - assertEquals(MIN_BYTES * 2, primary.getByteCount()); - OffsetByteRange residual = splits.getResidual(); - assertEquals(10_001, residual.getRange().getFrom()); - assertEquals(Long.MAX_VALUE, residual.getRange().getTo()); - assertEquals(0, residual.getByteCount()); + SplitResult splits = tracker.trySplit(IGNORED_FRACTION); + assertEquals(RANGE.getFrom(), splits.getPrimary().getFrom()); + assertEquals(10_001, splits.getPrimary().getTo()); + assertEquals(10_001, splits.getResidual().getFrom()); + assertEquals(Long.MAX_VALUE, splits.getResidual().getTo()); assertEquals(splits.getPrimary(), tracker.currentRestriction()); tracker.checkDone(); assertNull(tracker.trySplit(IGNORED_FRACTION)); @@ -108,10 +100,10 @@ public void claimSplitSuccess() { @SuppressWarnings({"dereference.of.nullable", "argument.type.incompatible"}) public void splitWithoutClaimEmpty() { when(ticker.read()).thenReturn(100000000000000L); - SplitResult splits = tracker.trySplit(IGNORED_FRACTION); - assertEquals(RANGE.getFrom(), splits.getPrimary().getRange().getFrom()); - assertEquals(RANGE.getFrom(), splits.getPrimary().getRange().getTo()); - assertEquals(RANGE, splits.getResidual().getRange()); + SplitResult splits = tracker.trySplit(IGNORED_FRACTION); + assertEquals(RANGE.getFrom(), splits.getPrimary().getFrom()); + assertEquals(RANGE.getFrom(), splits.getPrimary().getTo()); + assertEquals(RANGE, splits.getResidual()); assertEquals(splits.getPrimary(), tracker.currentRestriction()); tracker.checkDone(); assertNull(tracker.trySplit(IGNORED_FRACTION)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java index 0a4e3e7458f5..598037eef5f3 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java @@ -28,7 +28,6 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; @@ -52,8 +51,6 @@ import org.apache.beam.sdk.transforms.SerializableBiFunction; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.DoubleMath; import org.joda.time.Duration; import org.junit.Before; import org.junit.Test; @@ -68,24 +65,22 @@ public class PerSubscriptionPartitionSdfTest { private static final Duration MAX_SLEEP_TIME = Duration.standardMinutes(10).plus(Duration.millis(10)); - private static final OffsetByteRange RESTRICTION = - OffsetByteRange.of(new OffsetRange(1, Long.MAX_VALUE), 0); + private static final OffsetRange RESTRICTION = new OffsetRange(1, Long.MAX_VALUE); private static final SubscriptionPartition PARTITION = SubscriptionPartition.of(example(SubscriptionPath.class), example(Partition.class)); @Mock SerializableFunction offsetReaderFactory; - @Mock ManagedBacklogReaderFactory backlogReaderFactory; - @Mock TopicBacklogReader backlogReader; - @Mock - SerializableBiFunction trackerFactory; + SerializableBiFunction< + SubscriptionPartition, OffsetRange, RestrictionTracker> + trackerFactory; @Mock SubscriptionPartitionProcessorFactory processorFactory; @Mock SerializableFunction committerFactory; @Mock InitialOffsetReader initialOffsetReader; - @Spy TrackerWithProgress tracker; + @Spy RestrictionTracker tracker; @Mock OutputReceiver output; @Mock SubscriptionPartitionProcessor processor; @@ -103,11 +98,9 @@ public void setUp() { when(trackerFactory.apply(any(), any())).thenReturn(tracker); when(committerFactory.apply(any())).thenReturn(committer); when(tracker.currentRestriction()).thenReturn(RESTRICTION); - when(backlogReaderFactory.newReader(any())).thenReturn(backlogReader); sdf = new PerSubscriptionPartitionSdf( MAX_SLEEP_TIME, - backlogReaderFactory, offsetReaderFactory, trackerFactory, processorFactory, @@ -117,10 +110,9 @@ public void setUp() { @Test public void getInitialRestrictionReadSuccess() { when(initialOffsetReader.read()).thenReturn(example(Offset.class)); - OffsetByteRange range = sdf.getInitialRestriction(PARTITION); - assertEquals(example(Offset.class).value(), range.getRange().getFrom()); - assertEquals(Long.MAX_VALUE, range.getRange().getTo()); - assertEquals(0, range.getByteCount()); + OffsetRange range = sdf.getInitialRestriction(PARTITION); + assertEquals(example(Offset.class).value(), range.getFrom()); + assertEquals(Long.MAX_VALUE, range.getTo()); verify(offsetReaderFactory).apply(PARTITION); } @@ -133,13 +125,7 @@ public void getInitialRestrictionReadFailure() { @Test public void newTrackerCallsFactory() { assertSame(tracker, sdf.newTracker(PARTITION, RESTRICTION)); - verify(trackerFactory).apply(backlogReader, RESTRICTION); - } - - @Test - public void tearDownClosesBacklogReaderFactory() { - sdf.teardown(); - verify(backlogReaderFactory).close(); + verify(trackerFactory).apply(PARTITION, RESTRICTION); } @Test @@ -173,48 +159,12 @@ public void process() throws Exception { order2.verify(committer).awaitTerminated(); } - private static final class NoopManagedBacklogReaderFactory - implements ManagedBacklogReaderFactory { - @Override - public TopicBacklogReader newReader(SubscriptionPartition subscriptionPartition) { - return null; - } - - @Override - public void close() {} - } - @Test @SuppressWarnings("return.type.incompatible") public void dofnIsSerializable() throws Exception { ObjectOutputStream output = new ObjectOutputStream(new ByteArrayOutputStream()); output.writeObject( new PerSubscriptionPartitionSdf( - MAX_SLEEP_TIME, - new NoopManagedBacklogReaderFactory(), - x -> null, - (x, y) -> null, - (x, y, z) -> null, - (x) -> null)); - } - - @Test - public void getProgressUnboundedRangeDelegates() { - Progress progress = Progress.from(0, 0.2); - when(tracker.getProgress()).thenReturn(progress); - assertTrue( - DoubleMath.fuzzyEquals( - progress.getWorkRemaining(), sdf.getSize(PARTITION, RESTRICTION), .0001)); - verify(tracker).getProgress(); - } - - @Test - public void getProgressBoundedReturnsBytes() { - assertTrue( - DoubleMath.fuzzyEquals( - 123.0, - sdf.getSize(PARTITION, OffsetByteRange.of(new OffsetRange(87, 8000), 123)), - .0001)); - verifyNoInteractions(tracker); + MAX_SLEEP_TIME, x -> null, (x, y) -> null, (x, y, z) -> null, (x) -> null)); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java deleted file mode 100644 index e2429423dd0c..000000000000 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.pubsublite; - -import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; -import static org.junit.Assert.fail; - -import com.google.cloud.pubsublite.AdminClient; -import com.google.cloud.pubsublite.AdminClientSettings; -import com.google.cloud.pubsublite.BacklogLocation; -import com.google.cloud.pubsublite.CloudZone; -import com.google.cloud.pubsublite.Message; -import com.google.cloud.pubsublite.ProjectId; -import com.google.cloud.pubsublite.SubscriptionName; -import com.google.cloud.pubsublite.SubscriptionPath; -import com.google.cloud.pubsublite.TopicName; -import com.google.cloud.pubsublite.TopicPath; -import com.google.cloud.pubsublite.proto.PubSubMessage; -import com.google.cloud.pubsublite.proto.SequencedMessage; -import com.google.cloud.pubsublite.proto.Subscription; -import com.google.cloud.pubsublite.proto.Subscription.DeliveryConfig.DeliveryRequirement; -import com.google.cloud.pubsublite.proto.Topic; -import com.google.cloud.pubsublite.proto.Topic.PartitionConfig.Capacity; -import com.google.errorprone.annotations.concurrent.GuardedBy; -import com.google.protobuf.ByteString; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Deque; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.StreamingOptions; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.TestPipelineOptions; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.FlatMapElements; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; -import org.joda.time.Duration; -import org.junit.After; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@RunWith(JUnit4.class) -public class ReadWriteIT { - private static final Logger LOG = LoggerFactory.getLogger(ReadWriteIT.class); - private static final CloudZone ZONE = CloudZone.parse("us-central1-b"); - private static final int MESSAGE_COUNT = 90; - - @Rule public transient TestPipeline pipeline = TestPipeline.create(); - - private static ProjectId getProject(PipelineOptions options) { - return ProjectId.of(checkArgumentNotNull(options.as(GcpOptions.class).getProject())); - } - - private static String randomName() { - return "beam_it_resource_" + ThreadLocalRandom.current().nextLong(); - } - - private static AdminClient newAdminClient() { - return AdminClient.create(AdminClientSettings.newBuilder().setRegion(ZONE.region()).build()); - } - - private final Deque cleanupActions = new ArrayDeque<>(); - - private TopicPath createTopic(ProjectId id) throws Exception { - TopicPath toReturn = - TopicPath.newBuilder() - .setProject(id) - .setLocation(ZONE) - .setName(TopicName.of(randomName())) - .build(); - Topic.Builder topic = Topic.newBuilder().setName(toReturn.toString()); - topic - .getPartitionConfigBuilder() - .setCount(2) - .setCapacity(Capacity.newBuilder().setPublishMibPerSec(4).setSubscribeMibPerSec(4)); - topic.getRetentionConfigBuilder().setPerPartitionBytes(30 * (1L << 30)); - cleanupActions.addLast( - () -> { - try (AdminClient client = newAdminClient()) { - client.deleteTopic(toReturn).get(); - } catch (Throwable t) { - LOG.error("Failed to clean up topic.", t); - } - }); - try (AdminClient client = newAdminClient()) { - client.createTopic(topic.build()).get(); - } - return toReturn; - } - - private SubscriptionPath createSubscription(TopicPath topic) throws Exception { - SubscriptionPath toReturn = - SubscriptionPath.newBuilder() - .setProject(topic.project()) - .setLocation(ZONE) - .setName(SubscriptionName.of(randomName())) - .build(); - Subscription.Builder subscription = Subscription.newBuilder().setName(toReturn.toString()); - subscription - .getDeliveryConfigBuilder() - .setDeliveryRequirement(DeliveryRequirement.DELIVER_IMMEDIATELY); - subscription.setTopic(topic.toString()); - cleanupActions.addLast( - () -> { - try (AdminClient client = newAdminClient()) { - client.deleteSubscription(toReturn).get(); - } catch (Throwable t) { - LOG.error("Failed to clean up subscription.", t); - } - }); - try (AdminClient client = newAdminClient()) { - client.createSubscription(subscription.build(), BacklogLocation.BEGINNING).get(); - } - return toReturn; - } - - @After - public void tearDown() { - while (!cleanupActions.isEmpty()) { - cleanupActions.removeLast().run(); - } - } - - // Workaround for BEAM-12867 - // TODO(BEAM-12867): Remove this. - private static class CustomCreate extends PTransform, PCollection> { - @Override - public PCollection expand(PCollection input) { - return input.apply( - "createIndexes", - FlatMapElements.via( - new SimpleFunction>() { - @Override - public Iterable apply(Void input) { - return IntStream.range(0, MESSAGE_COUNT).boxed().collect(Collectors.toList()); - } - })); - } - } - - public static void writeMessages(TopicPath topicPath, Pipeline pipeline) { - PCollection trigger = pipeline.apply(Create.of((Void) null)); - PCollection indexes = trigger.apply("createIndexes", new CustomCreate()); - PCollection messages = - indexes.apply( - "createMessages", - MapElements.via( - new SimpleFunction( - index -> - Message.builder() - .setData(ByteString.copyFromUtf8(index.toString())) - .build() - .toProto()) {})); - // Add UUIDs to messages for later deduplication. - messages = messages.apply("addUuids", PubsubLiteIO.addUuids()); - messages.apply( - "writeMessages", - PubsubLiteIO.write(PublisherOptions.newBuilder().setTopicPath(topicPath).build())); - } - - public static PCollection readMessages( - SubscriptionPath subscriptionPath, Pipeline pipeline) { - PCollection messages = - pipeline.apply( - "readMessages", - PubsubLiteIO.read( - SubscriberOptions.newBuilder() - .setSubscriptionPath(subscriptionPath) - // setMinBundleTimeout INTENDED FOR TESTING ONLY - // This sacrifices efficiency to make tests run faster. Do not use this in a - // real pipeline! - .setMinBundleTimeout(Duration.standardSeconds(5)) - .build())); - // Deduplicate messages based on the uuids added in PubsubLiteIO.addUuids() when writing. - return messages.apply( - "dedupeMessages", PubsubLiteIO.deduplicate(UuidDeduplicationOptions.newBuilder().build())); - } - - // This static out of band communication is needed to retain serializability. - @GuardedBy("ReadWriteIT.class") - private static final List received = new ArrayList<>(); - - private static synchronized void addMessageReceived(SequencedMessage message) { - received.add(message); - } - - private static synchronized List getTestQuickstartReceived() { - return ImmutableList.copyOf(received); - } - - private static PTransform, PCollection> - collectTestQuickstart() { - return MapElements.via( - new SimpleFunction() { - @Override - public Void apply(SequencedMessage input) { - addMessageReceived(input); - return null; - } - }); - } - - @Test - public void testReadWrite() throws Exception { - pipeline.getOptions().as(StreamingOptions.class).setStreaming(true); - pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false); - - TopicPath topic = createTopic(getProject(pipeline.getOptions())); - SubscriptionPath subscription = createSubscription(topic); - - // Publish some messages - writeMessages(topic, pipeline); - - // Read some messages. They should be deduplicated by the time we see them, so there should be - // exactly numMessages, one for every index in [0,MESSAGE_COUNT). - PCollection messages = readMessages(subscription, pipeline); - messages.apply("messageReceiver", collectTestQuickstart()); - pipeline.run(); - LOG.info("Running!"); - for (int round = 0; round < 120; ++round) { - Thread.sleep(1000); - Map receivedCounts = new HashMap<>(); - for (SequencedMessage message : getTestQuickstartReceived()) { - int id = Integer.parseInt(message.getMessage().getData().toStringUtf8()); - receivedCounts.put(id, receivedCounts.getOrDefault(id, 0) + 1); - } - LOG.info("Performing comparison round {}.\n", round); - boolean done = true; - List missing = new ArrayList<>(); - for (int id = 0; id < MESSAGE_COUNT; id++) { - int idCount = receivedCounts.getOrDefault(id, 0); - if (idCount == 0) { - missing.add(id); - done = false; - } - if (idCount > 1) { - fail(String.format("Failed to deduplicate message with id %s.", id)); - } - } - LOG.info("Still messing messages: {}.\n", missing); - if (done) { - return; - } - } - fail( - String.format( - "Failed to receive all messages after 2 minutes. Received %s messages.", - getTestQuickstartReceived().size())); - } -} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java index 3d74375897a0..dbf3b939d083 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java @@ -31,6 +31,7 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; +import com.google.api.core.ApiFutures; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.pubsublite.Offset; @@ -39,6 +40,7 @@ import com.google.cloud.pubsublite.internal.wire.Subscriber; import com.google.cloud.pubsublite.proto.Cursor; import com.google.cloud.pubsublite.proto.FlowControlRequest; +import com.google.cloud.pubsublite.proto.SeekRequest; import com.google.cloud.pubsublite.proto.SequencedMessage; import com.google.protobuf.util.Timestamps; import java.util.List; @@ -62,7 +64,7 @@ @RunWith(JUnit4.class) @SuppressWarnings("initialization.fields.uninitialized") public class SubscriptionPartitionProcessorImplTest { - @Spy RestrictionTracker tracker; + @Spy RestrictionTracker tracker; @Mock OutputReceiver receiver; @Mock Function>, Subscriber> subscriberFactory; @@ -81,10 +83,6 @@ private static SequencedMessage messageWithOffset(long offset) { .build(); } - private OffsetByteRange initialRange() { - return OffsetByteRange.of(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE)); - } - @Before public void setUp() { initMocks(this); @@ -102,10 +100,17 @@ public void setUp() { @Test public void lifecycle() throws Exception { - when(tracker.currentRestriction()).thenReturn(initialRange()); + when(tracker.currentRestriction()) + .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE)); + when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class))); processor.start(); verify(subscriber).startAsync(); verify(subscriber).awaitRunning(); + verify(subscriber) + .seek( + SeekRequest.newBuilder() + .setCursor(Cursor.newBuilder().setOffset(example(Offset.class).value())) + .build()); verify(subscriber) .allowFlow( FlowControlRequest.newBuilder() @@ -118,15 +123,29 @@ public void lifecycle() throws Exception { } @Test - public void lifecycleFlowControlThrows() throws Exception { - when(tracker.currentRestriction()).thenReturn(initialRange()); + public void lifecycleSeekThrows() throws Exception { + when(tracker.currentRestriction()) + .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE)); + when(subscriber.seek(any())) + .thenReturn(ApiFutures.immediateFailedFuture(new CheckedApiException(Code.OUT_OF_RANGE))); doThrow(new CheckedApiException(Code.OUT_OF_RANGE)).when(subscriber).allowFlow(any()); assertThrows(CheckedApiException.class, () -> processor.start()); } + @Test + public void lifecycleFlowControlThrows() { + when(tracker.currentRestriction()) + .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE)); + when(subscriber.seek(any())) + .thenReturn(ApiFutures.immediateFailedFuture(new CheckedApiException(Code.OUT_OF_RANGE))); + assertThrows(CheckedApiException.class, () -> processor.start()); + } + @Test public void lifecycleSubscriberAwaitThrows() throws Exception { - when(tracker.currentRestriction()).thenReturn(initialRange()); + when(tracker.currentRestriction()) + .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE)); + when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class))); processor.start(); doThrow(new CheckedApiException(Code.INTERNAL).underlying).when(subscriber).awaitTerminated(); assertThrows(ApiException.class, () -> processor.close()); @@ -136,19 +155,21 @@ public void lifecycleSubscriberAwaitThrows() throws Exception { @Test public void subscriberFailureFails() throws Exception { - when(tracker.currentRestriction()).thenReturn(initialRange()); + when(tracker.currentRestriction()) + .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE)); + when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class))); processor.start(); subscriber.fail(new CheckedApiException(Code.OUT_OF_RANGE)); ApiException e = - assertThrows( - // Longer wait is needed due to listener asynchrony. - ApiException.class, () -> processor.waitForCompletion(Duration.standardSeconds(1))); + assertThrows(ApiException.class, () -> processor.waitForCompletion(Duration.ZERO)); assertEquals(Code.OUT_OF_RANGE, e.getStatusCode().getCode()); } @Test public void allowFlowFailureFails() throws Exception { - when(tracker.currentRestriction()).thenReturn(initialRange()); + when(tracker.currentRestriction()) + .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE)); + when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class))); processor.start(); when(tracker.tryClaim(any())).thenReturn(true); doThrow(new CheckedApiException(Code.OUT_OF_RANGE)).when(subscriber).allowFlow(any());