diff --git a/CHANGES.md b/CHANGES.md index d58c51894746..c66533f11989 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -77,6 +77,7 @@ * Passing a tag into MultiProcessShared is now required in the Python SDK ([#26168](https://github.com/apache/beam/issues/26168)). * CloudDebuggerOptions is removed (deprecated in Beam v2.47.0) for Dataflow runner as the Google Cloud Debugger service is [shutting down](https://cloud.google.com/debugger/docs/deprecations). (Java) ([#25959](https://github.com/apache/beam/issues/25959)). +* AWS 2 client providers (deprecated in Beam [v2.38.0](#2380---2022-04-20)) are finally removed ([#26681](https://github.com/apache/beam/issues/26681)). ## Deprecations diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/BasicDynamoDbClientProvider.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/BasicDynamoDbClientProvider.java deleted file mode 100644 index 871204bc23b4..000000000000 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/BasicDynamoDbClientProvider.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.dynamodb; - -import static org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory.defaultFactory; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; - -import java.net.URI; -import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; -import org.apache.beam.sdk.io.aws2.options.AwsOptions; -import org.checkerframework.checker.nullness.qual.Nullable; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.dynamodb.DynamoDbClient; - -/** - * Basic implementation of {@link DynamoDbClientProvider} used by default in {@link DynamoDBIO}. - * - * @deprecated Configure a custom {@link org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory} - * using {@link AwsOptions#getClientBuilderFactory()} instead. - */ -@Deprecated -public class BasicDynamoDbClientProvider implements DynamoDbClientProvider { - private final ClientConfiguration config; - - BasicDynamoDbClientProvider( - AwsCredentialsProvider credentialsProvider, String region, @Nullable URI endpoint) { - checkArgument(credentialsProvider != null, "awsCredentialsProvider can not be null"); - checkArgument(region != null, "region can not be null"); - config = ClientConfiguration.create(credentialsProvider, Region.of(region), endpoint); - } - - @Override - public DynamoDbClient getDynamoDbClient() { - return defaultFactory().create(DynamoDbClient.builder(), config, null).build(); - } - - @Override - public boolean equals(@Nullable Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - BasicDynamoDbClientProvider that = (BasicDynamoDbClientProvider) o; - return config.equals(that.config); - } - - @Override - public int hashCode() { - return config.hashCode(); - } -} diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO.java index cee56c529ca3..dc97371df0a4 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO.java @@ -22,19 +22,15 @@ import static java.util.stream.Collectors.toList; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; import java.io.IOException; -import java.io.Serializable; -import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Consumer; -import java.util.function.Function; import java.util.stream.Collectors; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.ListCoder; @@ -43,7 +39,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory; import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; -import org.apache.beam.sdk.io.aws2.common.RetryConfiguration; import org.apache.beam.sdk.io.aws2.options.AwsOptions; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; @@ -169,9 +164,7 @@ public static Write write() { @AutoValue public abstract static class Read extends PTransform> { - abstract @Nullable ClientConfiguration getClientConfiguration(); - - abstract @Nullable DynamoDbClientProvider getDynamoDbClientProvider(); + abstract ClientConfiguration getClientConfiguration(); abstract @Nullable SerializableFunction getScanRequestFn(); @@ -188,8 +181,6 @@ abstract static class Builder { abstract Builder setClientConfiguration(ClientConfiguration config); - abstract Builder setDynamoDbClientProvider(DynamoDbClientProvider dynamoDbClientProvider); - abstract Builder setScanRequestFn(SerializableFunction fn); abstract Builder setSegmentId(Integer segmentId); @@ -201,49 +192,8 @@ abstract static class Builder { abstract Read build(); } - /** - * @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} instead. Alternatively - * you can configure a custom {@link ClientBuilderFactory} in {@link AwsOptions}. - */ - @Deprecated - public Read withDynamoDbClientProvider(DynamoDbClientProvider clientProvider) { - checkArgument(clientProvider != null, "DynamoDbClientProvider cannot be null"); - return toBuilder() - .setClientConfiguration(null) - .setDynamoDbClientProvider(clientProvider) - .build(); - } - - /** @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} instead. */ - @Deprecated - public Read withDynamoDbClientProvider( - AwsCredentialsProvider credentials, String region, URI endpoint) { - return updateClientConfig( - b -> - b.credentialsProvider(credentials) - .region(Region.of(region)) - .endpoint(endpoint) - .build()); - } - - /** @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} instead. */ - @Deprecated - public Read withDynamoDbClientProvider(AwsCredentialsProvider credentials, String region) { - return updateClientConfig( - b -> b.credentialsProvider(credentials).region(Region.of(region)).build()); - } - /** Configuration of DynamoDB client. */ public Read withClientConfiguration(ClientConfiguration config) { - return updateClientConfig(ignore -> config); - } - - private Read updateClientConfig( - Function fn) { - checkState( - getDynamoDbClientProvider() == null, - "Legacy DynamoDbClientProvider is set, but incompatible with ClientConfiguration."); - ClientConfiguration config = fn.apply(getClientConfiguration().toBuilder()); checkArgument(config != null, "ClientConfiguration cannot be null"); return toBuilder().setClientConfiguration(config).build(); } @@ -288,11 +238,8 @@ public PCollection expand(PBegin input) { (scanRequest.totalSegments() != null && scanRequest.totalSegments() > 0), "TotalSegments is required with withScanRequestFn() and greater zero"); - if (getDynamoDbClientProvider() == null) { - checkNotNull(getClientConfiguration(), "clientConfiguration cannot be null"); - AwsOptions awsOptions = input.getPipeline().getOptions().as(AwsOptions.class); - ClientBuilderFactory.validate(awsOptions, getClientConfiguration()); - } + AwsOptions awsOptions = input.getPipeline().getOptions().as(AwsOptions.class); + ClientBuilderFactory.validate(awsOptions, getClientConfiguration()); PCollection> splits = input.apply("Create", Create.of(this)).apply("Split", ParDo.of(new SplitFn<>())); @@ -320,10 +267,6 @@ public void processElement(@Element Read spec, OutputReceiver> out) { /** A {@link DoFn} executing the ScanRequest to read from DynamoDb. */ private static class ReadFn extends DoFn, T> { private DynamoDbClient buildClient(Read spec, AwsOptions opts) { - if (spec.getDynamoDbClientProvider() != null) { - // build client using legacy DynamoDbClientProvider - return spec.getDynamoDbClientProvider().getDynamoDbClient(); - } return ClientBuilderFactory.buildClient( opts, DynamoDbClient.builder(), spec.getClientConfiguration()); } @@ -364,61 +307,11 @@ public List> apply(@Nullable ScanResponse scanRespon } } - /** - * Legacy retry configuration. - * - *

Warning: Max accumulative retry latency is silently ignored as it is not supported by - * the AWS SDK. - * - * @deprecated Use {@link org.apache.beam.sdk.io.aws2.common.RetryConfiguration} instead to - * delegate retries to the AWS SDK. - */ - @AutoValue - @Deprecated - public abstract static class RetryConfiguration implements Serializable { - abstract int getMaxAttempts(); - - abstract Duration getMaxDuration(); - - public static Builder builder() { - return new AutoValue_DynamoDBIO_RetryConfiguration.Builder(); - } - - @AutoValue.Builder - public abstract static class Builder { - public abstract Builder setMaxAttempts(int maxAttempts); - - /** - * @deprecated Warning, max accumulative retry latency is silently ignored as it is not - * supported by the AWS SDK. - */ - @Deprecated - public abstract Builder setMaxDuration(Duration maxDuration); - - abstract RetryConfiguration autoBuild(); - - public RetryConfiguration build() { - RetryConfiguration config = autoBuild(); - checkArgument(config.getMaxAttempts() > 0, "maxAttempts should be greater than 0"); - return config; - } - } - - org.apache.beam.sdk.io.aws2.common.RetryConfiguration convertLegacyConfig() { - int totalAttempts = getMaxAttempts() * 3; // 3 SDK attempts per user attempt - return org.apache.beam.sdk.io.aws2.common.RetryConfiguration.builder() - .numRetries(totalAttempts - 1) - .build(); - } - } - /** Write a PCollection data into DynamoDB. */ @AutoValue public abstract static class Write extends PTransform, PCollection> { - abstract @Nullable ClientConfiguration getClientConfiguration(); - - abstract @Nullable DynamoDbClientProvider getDynamoDbClientProvider(); + abstract ClientConfiguration getClientConfiguration(); abstract @Nullable SerializableFunction> getWriteItemMapperFn(); @@ -430,8 +323,6 @@ public abstract static class Write extends PTransform, PCollec abstract static class Builder { abstract Builder setClientConfiguration(ClientConfiguration config); - abstract Builder setDynamoDbClientProvider(DynamoDbClientProvider dynamoDbClientProvider); - abstract Builder setWriteItemMapperFn( SerializableFunction> writeItemMapperFn); @@ -440,65 +331,12 @@ abstract Builder setWriteItemMapperFn( abstract Write build(); } - /** - * @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} instead. Alternatively - * you can configure a custom {@link ClientBuilderFactory} in {@link AwsOptions}. - */ - @Deprecated - public Write withDynamoDbClientProvider(DynamoDbClientProvider clientProvider) { - checkArgument(clientProvider != null, "DynamoDbClientProvider cannot be null"); - return toBuilder() - .setClientConfiguration(null) - .setDynamoDbClientProvider(clientProvider) - .build(); - } - - /** @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} instead. */ - @Deprecated - public Write withDynamoDbClientProvider( - AwsCredentialsProvider credentials, String region, URI endpoint) { - return updateClientConfig( - b -> - b.credentialsProvider(credentials) - .region(Region.of(region)) - .endpoint(endpoint) - .build()); - } - - /** @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} instead. */ - @Deprecated - public Write withDynamoDbClientProvider(AwsCredentialsProvider credentials, String region) { - return updateClientConfig( - b -> b.credentialsProvider(credentials).region(Region.of(region)).build()); - } - /** Configuration of DynamoDB client. */ public Write withClientConfiguration(ClientConfiguration config) { - return updateClientConfig(ignore -> config); - } - - private Write updateClientConfig( - Function fn) { - checkState( - getDynamoDbClientProvider() == null, - "Legacy DynamoDbClientProvider is set, but incompatible with ClientConfiguration."); - ClientConfiguration config = fn.apply(getClientConfiguration().toBuilder()); checkArgument(config != null, "ClientConfiguration cannot be null"); return toBuilder().setClientConfiguration(config).build(); } - /** - * Retry configuration of DynamoDB client. - * - * @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} with {@link - * org.apache.beam.sdk.io.aws2.common.RetryConfiguration} instead to delegate retries to the - * AWS SDK. - */ - @Deprecated - public Write withRetryConfiguration(RetryConfiguration retry) { - return updateClientConfig(b -> b.retry(retry.convertLegacyConfig()).build()); - } - public Write withWriteRequestMapperFn( SerializableFunction> writeItemMapperFn) { return toBuilder().setWriteItemMapperFn(writeItemMapperFn).build(); @@ -510,11 +348,9 @@ public Write withDeduplicateKeys(List deduplicateKeys) { @Override public PCollection expand(PCollection input) { - if (getDynamoDbClientProvider() == null) { - checkNotNull(getClientConfiguration(), "clientConfiguration cannot be null"); - AwsOptions awsOptions = input.getPipeline().getOptions().as(AwsOptions.class); - ClientBuilderFactory.validate(awsOptions, getClientConfiguration()); - } + checkNotNull(getClientConfiguration(), "clientConfiguration cannot be null"); + AwsOptions awsOptions = input.getPipeline().getOptions().as(AwsOptions.class); + ClientBuilderFactory.validate(awsOptions, getClientConfiguration()); return input.apply(ParDo.of(new WriteFn<>(this))); } @@ -544,14 +380,8 @@ static class WriteFn extends DoFn { @Setup public void setup(PipelineOptions options) { ClientConfiguration clientConfig = spec.getClientConfiguration(); - if (spec.getDynamoDbClientProvider() != null) { - // build client using legacy DynamoDbClientProvider - client = spec.getDynamoDbClientProvider().getDynamoDbClient(); - } else { - AwsOptions awsOpts = options.as(AwsOptions.class); - client = - ClientBuilderFactory.buildClient(awsOpts, DynamoDbClient.builder(), clientConfig); - } + AwsOptions awsOpts = options.as(AwsOptions.class); + client = ClientBuilderFactory.buildClient(awsOpts, DynamoDbClient.builder(), clientConfig); // resume from partial failures resumeBackoff = FluentBackoff.DEFAULT.withMaxRetries(BATCH_SIZE); diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDbClientProvider.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDbClientProvider.java deleted file mode 100644 index 49a322d9f368..000000000000 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDbClientProvider.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.dynamodb; - -import java.io.Serializable; -import org.apache.beam.sdk.io.aws2.options.AwsOptions; -import software.amazon.awssdk.services.dynamodb.DynamoDbClient; - -/** - * Provides instances of DynamoDB clients. - * - *

Please note, that any instance of {@link DynamoDbClientProvider} must be {@link Serializable} - * to ensure it can be sent to worker machines. - * - * @deprecated Configure a custom {@link org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory} - * using {@link AwsOptions#getClientBuilderFactory()} instead. - */ -@Deprecated -public interface DynamoDbClientProvider extends Serializable { - DynamoDbClient getDynamoDbClient(); -} diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/AWSClientsProvider.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/AWSClientsProvider.java deleted file mode 100644 index 609c2d891927..000000000000 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/AWSClientsProvider.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.kinesis; - -import java.io.Serializable; -import org.apache.beam.sdk.io.aws2.options.AwsOptions; -import software.amazon.awssdk.services.cloudwatch.CloudWatchClient; -import software.amazon.awssdk.services.kinesis.KinesisClient; - -/** - * Provides instances of AWS clients. - * - * @deprecated Configure a custom {@link org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory} - * using {@link AwsOptions#getClientBuilderFactory()} instead. - */ -@Deprecated -public interface AWSClientsProvider extends Serializable { - KinesisClient getKinesisClient(); - - CloudWatchClient getCloudWatchClient(); -} diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/BasicKinesisProvider.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/BasicKinesisProvider.java deleted file mode 100644 index 8da72452486d..000000000000 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/BasicKinesisProvider.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.kinesis; - -import static org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory.defaultFactory; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; - -import java.net.URI; -import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; -import org.apache.beam.sdk.io.aws2.options.AwsOptions; -import org.checkerframework.checker.nullness.qual.Nullable; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.cloudwatch.CloudWatchClient; -import software.amazon.awssdk.services.kinesis.KinesisClient; - -/** - * Basic implementation of {@link AWSClientsProvider} used by default in {@link KinesisIO}. - * - * @deprecated Configure a custom {@link org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory} - * using {@link AwsOptions#getClientBuilderFactory()} instead. - */ -@Deprecated -class BasicKinesisProvider implements AWSClientsProvider { - private final ClientConfiguration config; - - BasicKinesisProvider( - AwsCredentialsProvider credentialsProvider, Region region, @Nullable String serviceEndpoint) { - checkArgument(credentialsProvider != null, "awsCredentialsProvider can not be null"); - checkArgument(region != null, "region can not be null"); - URI endpoint = serviceEndpoint != null ? URI.create(serviceEndpoint) : null; - config = ClientConfiguration.create(credentialsProvider, region, endpoint); - } - - @Override - public KinesisClient getKinesisClient() { - return defaultFactory().create(KinesisClient.builder(), config, null).build(); - } - - @Override - public CloudWatchClient getCloudWatchClient() { - return defaultFactory().create(CloudWatchClient.builder(), config, null).build(); - } - - @Override - public boolean equals(@Nullable Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - BasicKinesisProvider that = (BasicKinesisProvider) o; - return config.equals(that.config); - } - - @Override - public int hashCode() { - return config.hashCode(); - } -} diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java index 255efe5fdc6d..70e2f36ab38e 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java @@ -27,7 +27,6 @@ import com.google.auto.value.AutoValue; import java.io.Serializable; import java.math.BigInteger; -import java.net.URI; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; @@ -38,7 +37,6 @@ import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; -import java.util.function.Function; import java.util.function.Supplier; import javax.annotation.concurrent.NotThreadSafe; import javax.annotation.concurrent.ThreadSafe; @@ -81,9 +79,7 @@ import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; @@ -371,9 +367,7 @@ public abstract static class Read extends PTransform - b.credentialsProvider(awsCredentialsProvider) - .region(region) - .endpoint(endpoint) - .build()); - } - /** Configuration of Kinesis & Cloudwatch clients. */ public Read withClientConfiguration(ClientConfiguration config) { - return updateClientConfig(ignore -> config); - } - - private Read updateClientConfig(Function fn) { - checkState( - getAWSClientsProvider() == null, - "Legacy AWSClientsProvider is set, but incompatible with ClientConfiguration."); - ClientConfiguration config = fn.apply(getClientConfiguration().toBuilder()); checkArgument(config != null, "ClientConfiguration cannot be null"); return toBuilder().setClientConfiguration(config).build(); } @@ -625,11 +564,8 @@ public Read withMaxCapacityPerShard(Integer maxCapacity) { public PCollection expand(PBegin input) { checkArgument(getWatermarkPolicyFactory() != null, "WatermarkPolicyFactory is required"); checkArgument(getRateLimitPolicyFactory() != null, "RateLimitPolicyFactory is required"); - if (getAWSClientsProvider() == null) { - checkArgument(getClientConfiguration() != null, "ClientConfiguration is required"); - AwsOptions awsOptions = input.getPipeline().getOptions().as(AwsOptions.class); - ClientBuilderFactory.validate(awsOptions, getClientConfiguration()); - } + AwsOptions awsOptions = input.getPipeline().getOptions().as(AwsOptions.class); + ClientBuilderFactory.validate(awsOptions, getClientConfiguration()); Unbounded unbounded = org.apache.beam.sdk.io.Read.from(new KinesisSource(this)); diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisSource.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisSource.java index dfe96e450140..8e1d0b091f84 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisSource.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisSource.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.aws2.kinesis; import static org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory.buildClient; -import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; import java.io.IOException; @@ -30,7 +29,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory; import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Read; import org.apache.beam.sdk.io.aws2.options.AwsOptions; @@ -78,7 +76,9 @@ public List split(int desiredNumSplits, PipelineOptions options) } // in case a new checkpoint is created from scratch: else { - try (KinesisClient client = createKinesisClient(spec, options)) { + AwsOptions awsOptions = options.as(AwsOptions.class); + ClientConfiguration config = spec.getClientConfiguration(); + try (KinesisClient client = buildClient(awsOptions, KinesisClient.builder(), config)) { checkpoint = generateInitCheckpoint(spec, client); } } @@ -161,41 +161,23 @@ UnboundedReader initReader( } } - KinesisClient createKinesisClient(Read spec, PipelineOptions options) { - AwsOptions awsOptions = options.as(AwsOptions.class); - if (spec.getAWSClientsProvider() != null) { - return Preconditions.checkArgumentNotNull(spec.getAWSClientsProvider()).getKinesisClient(); - } else { - ClientConfiguration config = - Preconditions.checkArgumentNotNull(spec.getClientConfiguration()); - return buildClient(awsOptions, KinesisClient.builder(), config); - } - } - private SimplifiedKinesisClient createSimplifiedKinesisClient(PipelineOptions options) { AwsOptions awsOptions = options.as(AwsOptions.class); - Supplier kinesisSupplier = () -> createKinesisClient(spec, options); - Supplier cloudWatchSupplier; - AWSClientsProvider provider = spec.getAWSClientsProvider(); - if (provider != null) { - cloudWatchSupplier = provider::getCloudWatchClient; - } else { - ClientConfiguration config = - Preconditions.checkArgumentNotNull(spec.getClientConfiguration()); - cloudWatchSupplier = () -> buildClient(awsOptions, CloudWatchClient.builder(), config); - } + ClientConfiguration config = spec.getClientConfiguration(); + Supplier kinesisSupplier = + () -> buildClient(awsOptions, KinesisClient.builder(), config); + Supplier cloudWatchSupplier = + () -> buildClient(awsOptions, CloudWatchClient.builder(), config); + return new SimplifiedKinesisClient( kinesisSupplier, cloudWatchSupplier, spec.getRequestRecordsLimit()); } - static KinesisAsyncClient createAsyncClient(Read spec, PipelineOptions options) { + private static KinesisAsyncClient createAsyncClient(Read spec, PipelineOptions options) { AwsOptions awsOptions = options.as(AwsOptions.class); - ClientBuilderFactory builderFactory = ClientBuilderFactory.getFactory(awsOptions); - KinesisAsyncClientBuilder adjustedBuilder = + KinesisAsyncClientBuilder builder = KinesisClientUtil.adjustKinesisClientBuilder(KinesisAsyncClient.builder()); - return builderFactory - .create(adjustedBuilder, checkArgumentNotNull(spec.getClientConfiguration()), awsOptions) - .build(); + return buildClient(awsOptions, builder, spec.getClientConfiguration()); } /** diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/BasicSnsClientProvider.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/BasicSnsClientProvider.java deleted file mode 100644 index e03f297b673e..000000000000 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/BasicSnsClientProvider.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.sns; - -import static org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory.defaultFactory; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; - -import java.net.URI; -import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; -import org.apache.beam.sdk.io.aws2.options.AwsOptions; -import org.checkerframework.checker.nullness.qual.Nullable; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.sns.SnsClient; - -/** - * Basic implementation of {@link SnsClientProvider} used by default in {@link SnsIO}. - * - * @deprecated Configure a custom {@link org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory} - * using {@link AwsOptions#getClientBuilderFactory()} instead. - */ -@Deprecated -class BasicSnsClientProvider implements SnsClientProvider { - private final ClientConfiguration config; - - BasicSnsClientProvider( - AwsCredentialsProvider credentialsProvider, String region, @Nullable URI endpoint) { - checkArgument(credentialsProvider != null, "awsCredentialsProvider can not be null"); - checkArgument(region != null, "region can not be null"); - config = ClientConfiguration.create(credentialsProvider, Region.of(region), endpoint); - } - - @Override - public SnsClient getSnsClient() { - return defaultFactory().create(SnsClient.builder(), config, null).build(); - } - - @Override - public boolean equals(@Nullable Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - BasicSnsClientProvider that = (BasicSnsClientProvider) o; - return config.equals(that.config); - } - - @Override - public int hashCode() { - return config.hashCode(); - } -} diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsClientProvider.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsClientProvider.java deleted file mode 100644 index 7fc783535800..000000000000 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsClientProvider.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.sns; - -import java.io.Serializable; -import org.apache.beam.sdk.io.aws2.options.AwsOptions; -import software.amazon.awssdk.services.sns.SnsClient; - -/** - * Provides instances of SNS client. - * - *

Please note, that any instance of {@link SnsClientProvider} must be {@link Serializable} to - * ensure it can be sent to worker machines. - * - * @deprecated Configure a custom {@link org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory} - * using {@link AwsOptions#getClientBuilderFactory()} instead. - */ -@Deprecated -public interface SnsClientProvider extends Serializable { - SnsClient getSnsClient(); -} diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java index 5732192d5398..f8e40d00fadd 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java @@ -19,10 +19,8 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; -import java.io.Serializable; import java.net.URI; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -39,9 +37,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function; import org.checkerframework.checker.nullness.qual.Nullable; -import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -128,48 +124,17 @@ public static WriteAsync writeAsync() { return new AutoValue_SnsIO_WriteAsync.Builder().build(); } - /** - * Legacy retry configuration. - * - *

Warning: Max accumulative retry latency is silently ignored as it is not supported by - * the AWS SDK. - * - * @deprecated Use {@link org.apache.beam.sdk.io.aws2.common.RetryConfiguration} instead to - * delegate retries to the AWS SDK. - */ - @AutoValue - @Deprecated - public abstract static class RetryConfiguration implements Serializable { - abstract int getMaxAttempts(); - - /** @deprecated Use {@link org.apache.beam.sdk.io.aws2.common.RetryConfiguration} instead. */ - @Deprecated - public static RetryConfiguration create(int maxAttempts, Duration ignored) { - checkArgument(maxAttempts > 0, "maxAttempts should be greater than 0"); - return new AutoValue_SnsIO_RetryConfiguration(maxAttempts); - } - - org.apache.beam.sdk.io.aws2.common.RetryConfiguration convertLegacyConfig() { - int totalAttempts = getMaxAttempts() * 3; // 3 SDK attempts per user attempt - return org.apache.beam.sdk.io.aws2.common.RetryConfiguration.builder() - .numRetries(totalAttempts - 1) - .build(); - } - } - /** Implementation of {@link #write}. */ @AutoValue public abstract static class Write extends PTransform, PCollection> { - abstract @Nullable ClientConfiguration getClientConfiguration(); + abstract ClientConfiguration getClientConfiguration(); abstract @Nullable String getTopicArn(); abstract @Nullable SerializableFunction getPublishRequestBuilder(); - abstract @Nullable SnsClientProvider getSnsClientProvider(); - abstract @Nullable Coder getCoder(); abstract Builder builder(); @@ -184,8 +149,6 @@ abstract static class Builder { abstract Builder setPublishRequestBuilder( SerializableFunction requestBuilder); - abstract Builder setSnsClientProvider(SnsClientProvider snsClientProvider); - abstract Builder setCoder(Coder coder); abstract Write build(); @@ -222,62 +185,12 @@ public Write withPublishRequestFn(SerializableFunction pub return builder().setPublishRequestBuilder(m -> publishRequestFn.apply(m).toBuilder()).build(); } - /** - * @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} instead. Alternatively - * you can configure a custom {@link ClientBuilderFactory} in {@link AwsOptions}. - */ - @Deprecated - public Write withSnsClientProvider(SnsClientProvider clientProvider) { - checkArgument(clientProvider != null, "SnsClientProvider cannot be null"); - return builder().setClientConfiguration(null).setSnsClientProvider(clientProvider).build(); - } - - /** @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} instead. */ - @Deprecated - public Write withSnsClientProvider(AwsCredentialsProvider credentials, String region) { - return updateClientConfig( - b -> b.credentialsProvider(credentials).region(Region.of(region)).build()); - } - - /** @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} instead. */ - @Deprecated - public Write withSnsClientProvider( - AwsCredentialsProvider credentials, String region, URI endpoint) { - return updateClientConfig( - b -> - b.credentialsProvider(credentials) - .region(Region.of(region)) - .endpoint(endpoint) - .build()); - } - /** Configuration of SNS client. */ public Write withClientConfiguration(ClientConfiguration config) { - return updateClientConfig(ignore -> config); - } - - private Write updateClientConfig( - Function fn) { - checkState( - getSnsClientProvider() == null, - "Legacy SnsClientProvider is set, but incompatible with ClientConfiguration."); - ClientConfiguration config = fn.apply(getClientConfiguration().toBuilder()); checkArgument(config != null, "ClientConfiguration cannot be null"); return builder().setClientConfiguration(config).build(); } - /** - * Retry configuration of SNS client. - * - * @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} with {@link - * org.apache.beam.sdk.io.aws2.common.RetryConfiguration} instead to delegate retries to the - * AWS SDK. - */ - @Deprecated - public Write withRetryConfiguration(RetryConfiguration retry) { - return updateClientConfig(b -> b.retry(retry.convertLegacyConfig()).build()); - } - /** * Encode the full {@link PublishResponse} object, including sdkResponseMetadata and * sdkHttpMetadata with the HTTP response headers. @@ -316,10 +229,8 @@ public PCollection expand(PCollection input) { checkArgument(getPublishRequestBuilder() != null, "withPublishRequestBuilder() is required"); AwsOptions awsOptions = input.getPipeline().getOptions().as(AwsOptions.class); - if (getSnsClientProvider() == null) { - checkArgument(getClientConfiguration() != null, "withClientConfiguration() is required"); - ClientBuilderFactory.validate(awsOptions, getClientConfiguration()); - } + checkArgument(getClientConfiguration() != null, "withClientConfiguration() is required"); + ClientBuilderFactory.validate(awsOptions, getClientConfiguration()); if (getTopicArn() != null) { checkArgument(checkTopicExists(awsOptions), "Topic arn %s does not exist", getTopicArn()); } @@ -343,13 +254,8 @@ private boolean checkTopicExists(AwsOptions options) { } private SnsClient buildClient(AwsOptions options) { - if (getSnsClientProvider() != null) { - // build client using legacy SnsClientProvider - return getSnsClientProvider().getSnsClient(); - } else { - return ClientBuilderFactory.buildClient( - options.as(AwsOptions.class), SnsClient.builder(), getClientConfiguration()); - } + return ClientBuilderFactory.buildClient( + options.as(AwsOptions.class), SnsClient.builder(), getClientConfiguration()); } static class SnsWriterFn extends DoFn { diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/BasicSqsClientProvider.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/BasicSqsClientProvider.java deleted file mode 100644 index c81d512b9abb..000000000000 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/BasicSqsClientProvider.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.sqs; - -import static org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory.defaultFactory; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; - -import java.net.URI; -import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; -import org.apache.beam.sdk.io.aws2.options.AwsOptions; -import org.checkerframework.checker.nullness.qual.Nullable; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.sqs.SqsClient; - -/** - * Basic implementation of {@link SqsClientProvider} used by default in {@link SqsIO}. - * - * @deprecated Configure a custom {@link org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory} - * using {@link AwsOptions#getClientBuilderFactory()} instead. - */ -@Deprecated -class BasicSqsClientProvider implements SqsClientProvider { - private final ClientConfiguration config; - - BasicSqsClientProvider( - AwsCredentialsProvider credentialsProvider, String region, @Nullable URI endpoint) { - checkArgument(credentialsProvider != null, "awsCredentialsProvider can not be null"); - checkArgument(region != null, "region can not be null"); - config = ClientConfiguration.create(credentialsProvider, Region.of(region), endpoint); - } - - @Override - public SqsClient getSqsClient() { - return defaultFactory().create(SqsClient.builder(), config, null).build(); - } - - @Override - public boolean equals(@Nullable Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - BasicSqsClientProvider that = (BasicSqsClientProvider) o; - return config.equals(that.config); - } - - @Override - public int hashCode() { - return config.hashCode(); - } -} diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsClientProvider.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsClientProvider.java deleted file mode 100644 index c0db02527552..000000000000 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsClientProvider.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.sqs; - -import java.io.Serializable; -import org.apache.beam.sdk.io.aws2.options.AwsOptions; -import software.amazon.awssdk.services.sqs.SqsClient; - -/** - * Provides instances of Sqs clients. - * - *

Please note, that any instance of {@link SqsClientProvider} must be {@link Serializable} to - * ensure it can be sent to worker machines. - * - * @deprecated Configure a custom {@link org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory} - * using {@link AwsOptions#getClientBuilderFactory()} instead. - */ -@Deprecated -public interface SqsClientProvider extends Serializable { - SqsClient getSqsClient(); -} diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java index 333df8302075..72befc6003fe 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java @@ -18,11 +18,8 @@ package org.apache.beam.sdk.io.aws2.sqs; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; -import java.net.URI; import java.util.function.Consumer; import org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory; import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; @@ -34,7 +31,6 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -122,7 +118,7 @@ private SqsIO() {} @AutoValue public abstract static class Read extends PTransform> { - abstract @Nullable ClientConfiguration clientConfiguration(); + abstract ClientConfiguration clientConfiguration(); abstract @Nullable String queueUrl(); @@ -130,8 +126,6 @@ public abstract static class Read extends PTransform b.credentialsProvider(credentials).region(Region.of(region)).build()); - } - - /** @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} instead. */ - @Deprecated - public Read withSqsClientProvider( - AwsCredentialsProvider credentials, String region, URI endpoint) { - return updateClientConfig( - b -> - b.credentialsProvider(credentials) - .region(Region.of(region)) - .endpoint(endpoint) - .build()); - } - /** Configuration of SQS client. */ public Read withClientConfiguration(ClientConfiguration config) { - return updateClientConfig(ignore -> config); - } - - private Read updateClientConfig(Function fn) { - checkState( - sqsClientProvider() == null, - "Legacy SqsClientProvider is set, but incompatible with ClientConfiguration."); - ClientConfiguration config = fn.apply(clientConfiguration().toBuilder()); checkArgument(config != null, "ClientConfiguration cannot be null"); return builder().setClientConfiguration(config).build(); } @Override public PCollection expand(PBegin input) { - if (clientConfiguration() != null) { - AwsOptions awsOptions = input.getPipeline().getOptions().as(AwsOptions.class); - ClientBuilderFactory.validate(awsOptions, clientConfiguration()); - } + AwsOptions awsOptions = input.getPipeline().getOptions().as(AwsOptions.class); + ClientBuilderFactory.validate(awsOptions, clientConfiguration()); org.apache.beam.sdk.io.Read.Unbounded unbounded = org.apache.beam.sdk.io.Read.from(new SqsUnboundedSource(this)); @@ -243,9 +196,7 @@ public PCollection expand(PBegin input) { @AutoValue public abstract static class Write extends PTransform, PDone> { - abstract @Nullable ClientConfiguration getClientConfiguration(); - - abstract @Nullable SqsClientProvider getSqsClientProvider(); + abstract ClientConfiguration getClientConfiguration(); abstract Builder builder(); @@ -253,62 +204,19 @@ public abstract static class Write extends PTransform b.credentialsProvider(credentials).region(Region.of(region)).build()); - } - - /** @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} instead. */ - @Deprecated - public Write withSqsClientProvider( - AwsCredentialsProvider credentials, String region, URI endpoint) { - return updateClientConfig( - b -> - b.credentialsProvider(credentials) - .region(Region.of(region)) - .endpoint(endpoint) - .build()); - } - /** Configuration of SQS client. */ public Write withClientConfiguration(ClientConfiguration config) { - return updateClientConfig(ignore -> config); - } - - private Write updateClientConfig( - Function fn) { - checkState( - getSqsClientProvider() == null, - "Legacy SqsClientProvider is set, but incompatible with ClientConfiguration."); - ClientConfiguration config = fn.apply(getClientConfiguration().toBuilder()); checkArgument(config != null, "ClientConfiguration cannot be null"); return builder().setClientConfiguration(config).build(); } @Override public PDone expand(PCollection input) { - if (getSqsClientProvider() == null) { - checkNotNull(getClientConfiguration(), "clientConfiguration cannot be null"); - AwsOptions awsOptions = input.getPipeline().getOptions().as(AwsOptions.class); - ClientBuilderFactory.validate(awsOptions, getClientConfiguration()); - } + AwsOptions awsOptions = input.getPipeline().getOptions().as(AwsOptions.class); + ClientBuilderFactory.validate(awsOptions, getClientConfiguration()); input.apply(ParDo.of(new SqsWriteFn(this))); return PDone.in(input.getPipeline()); @@ -325,15 +233,10 @@ private static class SqsWriteFn extends DoFn { @Setup public void setup(PipelineOptions options) throws Exception { - if (spec.getSqsClientProvider() != null) { - // build client using legacy SnsClientProvider - sqs = spec.getSqsClientProvider().getSqsClient(); - } else { - AwsOptions awsOpts = options.as(AwsOptions.class); - sqs = - ClientBuilderFactory.buildClient( - awsOpts, SqsClient.builder(), spec.getClientConfiguration()); - } + AwsOptions awsOpts = options.as(AwsOptions.class); + sqs = + ClientBuilderFactory.buildClient( + awsOpts, SqsClient.builder(), spec.getClientConfiguration()); } @ProcessElement diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java index 5b357dd85d3b..56a1be24115b 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java @@ -453,13 +453,8 @@ private String queueUrl() { private void initClient() { if (sqsClient == null) { - if (source.getRead().sqsClientProvider() != null) { - // build client using legacy SqsClientProvider - sqsClient = source.getRead().sqsClientProvider().getSqsClient(); - } else { - ClientConfiguration config = source.getRead().clientConfiguration(); - sqsClient = ClientBuilderFactory.buildClient(awsOptions, SqsClient.builder(), config); - } + ClientConfiguration config = source.getRead().clientConfiguration(); + sqsClient = ClientBuilderFactory.buildClient(awsOptions, SqsClient.builder(), config); } } diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/MockClientBuilderFactory.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/MockClientBuilderFactory.java index 0be74c00fa85..e7ca4c823a9b 100644 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/MockClientBuilderFactory.java +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/MockClientBuilderFactory.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory; import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; import org.apache.beam.sdk.io.aws2.options.AwsOptions; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.testing.TestPipeline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,8 +36,10 @@ /** * A mock {@link ClientBuilderFactory} to facilitate unit tests with mocked AWS clients. This - * factory returns a mocked builder with a preconfigured client (using {@link #set(TestPipeline, - * Class, Object)}), if available. Otherwise and empty mock is returned. + * factory returns a mocked builder with a preconfigured client which is tied to a specific instance + * of a pipeline or pipeline options respectively (using {@link #set(TestPipeline, Class, Object)} + * or {@link #set(PipelineOptions, Class, Object)}), if available. Otherwise and empty mock is + * returned. * *

Example usage: * @@ -46,7 +49,7 @@ * * @Before * public void configureClientBuilderFactory() { - * StaticClientBuilderFactory.prepare(pipeline, S3ClientBuilder.class, s3); + * StaticClientBuilderFactory.set(pipeline, S3ClientBuilder.class, s3); * } * } */ @@ -55,19 +58,21 @@ public class MockClientBuilderFactory implements ClientBuilderFactory { @SuppressWarnings("rawtypes") private static final WeakHashMap< - TestPipeline, Map, AwsClientBuilder>> + PipelineOptions, Map, AwsClientBuilder>> CLIENTS = new WeakHashMap<>(); public static , ClientT> void set( TestPipeline pipeline, Class builderClass, ClientT clientT) { - pipeline - .getOptions() - .as(AwsOptions.class) - .setClientBuilderFactory(MockClientBuilderFactory.class); + set(pipeline.getOptions(), builderClass, clientT); + } + + public static , ClientT> void set( + PipelineOptions options, Class builderClass, ClientT clientT) { + options.as(AwsOptions.class).setClientBuilderFactory(MockClientBuilderFactory.class); BuilderT builder = mock(builderClass); when(builder.build()).thenReturn(clientT); - CLIENTS.computeIfAbsent(pipeline, ignore -> new HashMap<>()).put(builderClass, builder); + CLIENTS.computeIfAbsent(options, ignore -> new HashMap<>()).put(builderClass, builder); } @Override @@ -77,7 +82,7 @@ public , ClientT> BuilderT // safe to cast: builder is instance of key (builder class) and value is mocked accordingly Optional, AwsClientBuilder>> mock = CLIENTS.entrySet().stream() - .filter(kv -> kv.getKey().getOptions().getOptionsId() == options.getOptionsId()) + .filter(kv -> kv.getKey().getOptionsId() == options.getOptionsId()) .flatMap(kv -> kv.getValue().entrySet().stream()) .filter(b -> b.getKey().isInstance(builder)) .findFirst(); diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/dynamodb/BasicDynamoDbClientProviderTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/dynamodb/BasicDynamoDbClientProviderTest.java deleted file mode 100644 index 29c41ad6a517..000000000000 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/dynamodb/BasicDynamoDbClientProviderTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.dynamodb; - -import static org.junit.Assert.assertEquals; - -import org.apache.beam.sdk.util.SerializableUtils; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; - -/** Tests on {@link BasicDynamoDbClientProvider}. */ -@RunWith(JUnit4.class) -public class BasicDynamoDbClientProviderTest { - - @Test - public void testSerialization() { - AwsCredentialsProvider awsCredentialsProvider = - StaticCredentialsProvider.create( - AwsBasicCredentials.create("ACCESS_KEY_ID", "SECRET_ACCESS_KEY")); - - BasicDynamoDbClientProvider dynamoDbClientProvider = - new BasicDynamoDbClientProvider(awsCredentialsProvider, "us-east-1", null); - - byte[] serializedBytes = SerializableUtils.serializeToByteArray(dynamoDbClientProvider); - - BasicDynamoDbClientProvider dynamoDbClientProviderDeserialized = - (BasicDynamoDbClientProvider) - SerializableUtils.deserializeFromByteArray(serializedBytes, "Aws Credentials Provider"); - - assertEquals(dynamoDbClientProvider, dynamoDbClientProviderDeserialized); - } -} diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOReadTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOReadTest.java index 505fff999bd4..2abbca2cedb1 100644 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOReadTest.java +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOReadTest.java @@ -18,26 +18,21 @@ package org.apache.beam.sdk.io.aws2.dynamodb; import static java.lang.Math.min; -import static java.util.function.Function.identity; import static java.util.stream.Collectors.toList; import static java.util.stream.IntStream.range; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables.getLast; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayList; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.transform; -import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.net.URI; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.function.Function; import java.util.stream.IntStream; import org.apache.beam.sdk.io.aws2.MockClientBuilderFactory; -import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; import org.apache.beam.sdk.io.aws2.dynamodb.DynamoDBIO.Read; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -53,9 +48,6 @@ import org.mockito.ArgumentMatcher; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; -import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; @@ -84,24 +76,11 @@ private Read>> dynamoDbRead(Integer segments) { @Test public void testReadOneSegment() { - readOneSegment(identity()); - } - - @Test - public void testReadOneSegmentWithLegacyProvider() { - MockClientBuilderFactory.set(pipeline, DynamoDbClientBuilder.class, null); - readOneSegment( - read -> read.withDynamoDbClientProvider(StaticDynamoDBClientProvider.of(client))); - } - - private void readOneSegment( - Function>>, Read>>> - fn) { MockData mockData = new MockData(range(0, 10)); mockData.mockScan(10, client); // 1 scan iteration Read>> read = dynamoDbRead(1); - PCollection>> actual = pipeline.apply(fn.apply(read)); + PCollection>> actual = pipeline.apply(read); PAssert.that(actual.apply(Count.globally())).containsInAnyOrder(1L); PAssert.that(actual).containsInAnyOrder(mockData.getAllItems()); @@ -132,24 +111,11 @@ public void testReadWithCustomLimit() { @Test public void testReadThreeSegments() { - readThreeSegments(identity()); - } - - @Test - public void testReadThreeSegmentsWithLegacyProvider() { - MockClientBuilderFactory.set(pipeline, DynamoDbClientBuilder.class, null); - readThreeSegments( - read -> read.withDynamoDbClientProvider(StaticDynamoDBClientProvider.of(client))); - } - - private void readThreeSegments( - Function>>, Read>>> - fn) { MockData mockData = new MockData(range(0, 10), range(10, 20), range(20, 30)); mockData.mockScan(10, client); // 1 scan iteration per segment Read>> read = dynamoDbRead(3); - PCollection>> actual = pipeline.apply(fn.apply(read)); + PCollection>> actual = pipeline.apply(read); PAssert.that(actual.apply(Count.globally())).containsInAnyOrder(3L); PAssert.that(actual.apply(Flatten.iterables())).containsInAnyOrder(mockData.getAllItems()); @@ -197,29 +163,6 @@ public void testReadInvalidTotalSegments() { pipeline.apply(dynamoDbRead(0)); } - @Test - public void testBuildWithCredentialsProviderAndRegion() { - Region region = Region.US_EAST_1; - AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); - - Read read = - DynamoDBIO.read().withDynamoDbClientProvider(credentialsProvider, region.id()); - assertThat(read.getClientConfiguration()) - .isEqualTo(ClientConfiguration.create(credentialsProvider, region, null)); - } - - @Test - public void testBuildWithCredentialsProviderAndRegionAndEndpoint() { - Region region = Region.US_EAST_1; - AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); - URI endpoint = URI.create("localhost:9999"); - - Read read = - DynamoDBIO.read().withDynamoDbClientProvider(credentialsProvider, region.id(), endpoint); - assertThat(read.getClientConfiguration()) - .isEqualTo(ClientConfiguration.create(credentialsProvider, region, endpoint)); - } - private static class MockData { private final List> data; diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOWriteTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOWriteTest.java index b410ec916d6d..10959341499e 100644 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOWriteTest.java +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOWriteTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.aws2.dynamodb; -import static java.util.function.Function.identity; import static java.util.stream.Collectors.toList; import static java.util.stream.IntStream.range; import static java.util.stream.IntStream.rangeClosed; @@ -32,7 +31,6 @@ import static org.mockito.Mockito.when; import java.io.Serializable; -import java.net.URI; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -44,8 +42,6 @@ import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.io.aws2.MockClientBuilderFactory; -import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; -import org.apache.beam.sdk.io.aws2.common.RetryConfiguration; import org.apache.beam.sdk.io.aws2.dynamodb.DynamoDBIO.Write; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -58,7 +54,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; -import org.joda.time.Duration; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -69,9 +64,6 @@ import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; -import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; @@ -97,23 +89,12 @@ public void configureClientBuilderFactory() { @Test public void testWritePutItems() { - writePutItems(identity()); - } - - @Test - public void testWritePutItemsWithLegacyProvider() { - MockClientBuilderFactory.set(pipeline, DynamoDbClientBuilder.class, null); - writePutItems( - write -> write.withDynamoDbClientProvider(StaticDynamoDBClientProvider.of(client))); - } - - private void writePutItems(Function, Write> fn) { List items = Item.range(0, 100); Supplier> capturePuts = captureBatchWrites(client, req -> req.putRequest().item()); Write write = DynamoDBIO.write().withWriteRequestMapperFn(putRequestMapper); - PCollection output = pipeline.apply(Create.of(items)).apply(fn.apply(write)); + PCollection output = pipeline.apply(Create.of(items)).apply(write); PAssert.that(output).empty(); pipeline.run().waitUntilFinish(); @@ -177,24 +158,13 @@ public void testWritePutItemsWithDuplicatesByKey() { @Test public void testWriteDeleteItems() { - writeDeleteItems(identity()); - } - - @Test - public void testWriteDeleteItemsWithLegacyProvider() { - MockClientBuilderFactory.set(pipeline, DynamoDbClientBuilder.class, null); - writeDeleteItems( - write -> write.withDynamoDbClientProvider(StaticDynamoDBClientProvider.of(client))); - } - - private void writeDeleteItems(Function, Write> fn) { List items = Item.range(0, 100); Supplier> captureDeletes = captureBatchWrites(client, req -> req.deleteRequest().key()); Write write = DynamoDBIO.write().withWriteRequestMapperFn(deleteRequestMapper); - PCollection output = pipeline.apply(Create.of(items)).apply(fn.apply(write)); + PCollection output = pipeline.apply(Create.of(items)).apply(write); PAssert.that(output).empty(); pipeline.run().waitUntilFinish(); @@ -251,45 +221,6 @@ public void testWritePutItemsWithPartialSuccess() { ordered.verify(client).batchWriteItem(argThat(matchWritesUnordered(writes.subList(8, 10)))); } - @Test - public void testBuildWithCredentialsProviderAndRegion() { - Region region = Region.US_EAST_1; - AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); - - Write write = - DynamoDBIO.write().withDynamoDbClientProvider(credentialsProvider, region.id()); - assertThat(write.getClientConfiguration()) - .isEqualTo(ClientConfiguration.create(credentialsProvider, region, null)); - } - - @Test - public void testBuildWithCredentialsProviderAndRegionAndEndpoint() { - Region region = Region.US_EAST_1; - AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); - URI endpoint = URI.create("localhost:9999"); - - Write write = - DynamoDBIO.write().withDynamoDbClientProvider(credentialsProvider, region.id(), endpoint); - assertThat(write.getClientConfiguration()) - .isEqualTo(ClientConfiguration.create(credentialsProvider, region, endpoint)); - } - - @Test - public void testBuildWithRetryConfig() { - // sum up user level and sdk level retries - DynamoDBIO.RetryConfiguration retryConfig = - DynamoDBIO.RetryConfiguration.builder() - .setMaxAttempts(3) - .setMaxDuration(Duration.ZERO) - .build(); - Write write = DynamoDBIO.write().withRetryConfiguration(retryConfig); - assertThat(write.getClientConfiguration()) - .isEqualTo( - ClientConfiguration.builder() - .retry(RetryConfiguration.builder().numRetries(8).build()) - .build()); - } - @DefaultCoder(AvroCoder.class) static class Item implements Serializable { Map entries; diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/dynamodb/StaticDynamoDBClientProvider.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/dynamodb/StaticDynamoDBClientProvider.java deleted file mode 100644 index c80ebaa28826..000000000000 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/dynamodb/StaticDynamoDBClientProvider.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.dynamodb; - -import org.apache.beam.sdk.io.aws2.StaticSupplier; -import software.amazon.awssdk.services.dynamodb.DynamoDbClient; - -/** Client provider supporting unserializable clients such as mock instances for unit tests. */ -class StaticDynamoDBClientProvider - extends StaticSupplier - implements DynamoDbClientProvider { - static DynamoDbClientProvider of(DynamoDbClient client) { - return new StaticDynamoDBClientProvider().withObject(client); - } - - @Override - public DynamoDbClient getDynamoDbClient() { - return get(); - } -} diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/BasicKinesisClientProviderTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/BasicKinesisClientProviderTest.java deleted file mode 100644 index 67eb255f7f3f..000000000000 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/BasicKinesisClientProviderTest.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.kinesis; - -import static org.junit.Assert.assertEquals; - -import org.apache.beam.sdk.util.SerializableUtils; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.regions.Region; - -/** Tests on {@link org.apache.beam.sdk.io.aws2.kinesis.BasicKinesisProvider}. */ -@RunWith(JUnit4.class) -public class BasicKinesisClientProviderTest { - - @Test - public void testSerialization() { - AwsCredentialsProvider awsCredentialsProvider = - StaticCredentialsProvider.create( - AwsBasicCredentials.create("ACCESS_KEY_ID", "SECRET_ACCESS_KEY")); - - BasicKinesisProvider kinesisProvider = - new BasicKinesisProvider(awsCredentialsProvider, Region.AP_EAST_1, null); - - byte[] serializedBytes = SerializableUtils.serializeToByteArray(kinesisProvider); - - BasicKinesisProvider kinesisProviderDeserialized = - (BasicKinesisProvider) - SerializableUtils.deserializeFromByteArray(serializedBytes, "Basic Kinesis Provider"); - - assertEquals(kinesisProvider, kinesisProviderDeserialized); - } -} diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.java index 25760524fe14..aa43f6e95bd7 100644 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.java +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.java @@ -32,12 +32,10 @@ import static org.mockito.Mockito.when; import static software.amazon.kinesis.common.InitialPositionInStream.TRIM_HORIZON; -import java.net.URI; import java.util.List; import java.util.function.Function; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.io.aws2.MockClientBuilderFactory; -import org.apache.beam.sdk.io.aws2.StaticSupplier; import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Read; import org.apache.beam.sdk.testing.PAssert; @@ -54,11 +52,6 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchClientBuilder; import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder; @@ -73,9 +66,6 @@ /** Tests for {@link KinesisIO#read}. */ @RunWith(MockitoJUnitRunner.class) public class KinesisIOReadTest { - private static final String KEY = "key"; - private static final String SECRET = "secret"; - private static final int SHARDS = 3; @Rule public final transient TestPipeline p = TestPipeline.create(); @@ -145,17 +135,6 @@ public void testReadWithEFOFromShards() { p.run(); } - @Test - public void testReadFromShardsWithLegacyProvider() { - List> records = createRecords(SHARDS, SHARD_EVENTS); - mockShards(client, SHARDS); - mockShardIterators(client, records); - mockRecords(client, records, 10); - - MockClientBuilderFactory.set(p, KinesisClientBuilder.class, null); - readFromShards(read -> read.withAWSClientsProvider(Provider.of(client)), concat(records)); - } - @Test(expected = PipelineExecutionException.class) public void testReadWithLimitExceeded() { when(client.listShards(any(ListShardsRequest.class))) @@ -178,57 +157,6 @@ private void readFromShards(Function fn, Iterable expected) p.run(); } - @Test - public void testBuildWithBasicCredentials() { - Region region = Region.US_EAST_1; - AwsBasicCredentials credentials = AwsBasicCredentials.create(KEY, SECRET); - StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(credentials); - - Read read = KinesisIO.read().withAWSClientsProvider(KEY, SECRET, region); - - assertThat(read.getClientConfiguration()) - .isEqualTo(ClientConfiguration.create(credentialsProvider, region, null)); - } - - @Test - public void testBuildWithCredentialsProvider() { - Region region = Region.US_EAST_1; - AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); - - Read read = KinesisIO.read().withAWSClientsProvider(credentialsProvider, region); - - assertThat(read.getClientConfiguration()) - .isEqualTo(ClientConfiguration.create(credentialsProvider, region, null)); - } - - @Test - public void testBuildWithBasicCredentialsAndCustomEndpoint() { - String customEndpoint = "localhost:9999"; - Region region = Region.US_WEST_1; - AwsBasicCredentials credentials = AwsBasicCredentials.create("key", "secret"); - StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(credentials); - - Read read = KinesisIO.read().withAWSClientsProvider(KEY, SECRET, region, customEndpoint); - - assertThat(read.getClientConfiguration()) - .isEqualTo( - ClientConfiguration.create(credentialsProvider, region, URI.create(customEndpoint))); - } - - @Test - public void testBuildWithCredentialsProviderAndCustomEndpoint() { - String customEndpoint = "localhost:9999"; - Region region = Region.US_WEST_1; - AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); - - Read read = - KinesisIO.read().withAWSClientsProvider(credentialsProvider, region, customEndpoint); - - assertThat(read.getClientConfiguration()) - .isEqualTo( - ClientConfiguration.create(credentialsProvider, region, URI.create(customEndpoint))); - } - static class ToRecord extends DoFn { @ProcessElement public void processElement(@Element KinesisRecord rec, OutputReceiver out) { @@ -236,21 +164,4 @@ public void processElement(@Element KinesisRecord rec, OutputReceiver ou out.output(record(arrival, rec.getDataAsBytes(), rec.getSequenceNumber())); } } - - static class Provider extends StaticSupplier - implements AWSClientsProvider { - static AWSClientsProvider of(KinesisClient client) { - return new Provider().withObject(client); - } - - @Override - public KinesisClient getKinesisClient() { - return get(); - } - - @Override - public CloudWatchClient getCloudWatchClient() { - return mock(CloudWatchClient.class); - } - } } diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisSourceTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisSourceTest.java index 7b72be2e5bbb..7eed15210454 100644 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisSourceTest.java +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisSourceTest.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.when; import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.aws2.MockClientBuilderFactory; import org.apache.beam.sdk.io.aws2.options.AwsOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -38,12 +39,16 @@ import software.amazon.awssdk.http.HttpStatusCode; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.KinesisClientBuilder; import software.amazon.awssdk.services.kinesis.model.LimitExceededException; import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.kinesis.common.InitialPositionInStream; @RunWith(MockitoJUnitRunner.Silent.class) public class KinesisSourceTest { + @Mock private KinesisClient kinesisClient; + private PipelineOptions options = createOptions(); + @Test public void testCreateReaderOfCorrectType() throws Exception { KinesisIO.Read readSpec = @@ -60,26 +65,23 @@ public void testCreateReaderOfCorrectType() throws Exception { KinesisReaderCheckpoint initCheckpoint = new KinesisReaderCheckpoint(ImmutableList.of()); UnboundedSource.UnboundedReader reader = - new KinesisSource(readSpec, initCheckpoint).createReader(opts(), null); + new KinesisSource(readSpec, initCheckpoint).createReader(options, null); assertThat(reader).isInstanceOf(KinesisReader.class); UnboundedSource.UnboundedReader efoReader = - new KinesisSource(readSpecEFO, initCheckpoint).createReader(opts(), null); + new KinesisSource(readSpecEFO, initCheckpoint).createReader(options, null); assertThat(efoReader).isInstanceOf(EFOKinesisReader.class); } - @Mock private KinesisClient kinesisClient; - private KinesisSource source; - @Test public void testSplitGeneratesCorrectNumberOfSources() throws Exception { mockShards(kinesisClient, 3); - source = sourceWithMockedKinesisClient(spec()); - assertThat(source.split(1, opts()).size()).isEqualTo(1); - assertThat(source.split(2, opts()).size()).isEqualTo(2); - assertThat(source.split(3, opts()).size()).isEqualTo(3); + KinesisSource source = sourceWithMockedKinesisClient(spec()); + assertThat(source.split(1, options).size()).isEqualTo(1); + assertThat(source.split(2, options).size()).isEqualTo(2); + assertThat(source.split(3, options).size()).isEqualTo(3); // there are only 3 shards, no more than 3 splits can be created - assertThat(source.split(4, opts()).size()).isEqualTo(3); + assertThat(source.split(4, options).size()).isEqualTo(3); } @Test @@ -96,15 +98,11 @@ public void shouldThrowServiceErrorForShardListing() { } private KinesisSource sourceWithMockedKinesisClient(KinesisIO.Read read) { - return new KinesisSource(read) { - @Override - KinesisClient createKinesisClient(KinesisIO.Read spec, PipelineOptions options) { - return kinesisClient; - } - }; + MockClientBuilderFactory.set(options, KinesisClientBuilder.class, kinesisClient); + return new KinesisSource(read); } - private PipelineOptions opts() { + private PipelineOptions createOptions() { AwsOptions options = PipelineOptionsFactory.fromArgs().as(AwsOptions.class); options.setAwsRegion(Region.AP_EAST_1); return options; @@ -120,8 +118,8 @@ private void shouldThrowShardListingError( Exception thrownException, Class expectedExceptionClass) { when(kinesisClient.listShards(any(ListShardsRequest.class))).thenThrow(thrownException); try { - source = sourceWithMockedKinesisClient(spec()); - source.split(1, opts()); + KinesisSource source = sourceWithMockedKinesisClient(spec()); + source.split(1, options); failBecauseExceptionWasNotThrown(expectedExceptionClass); } catch (Exception e) { assertThat(e).isExactlyInstanceOf(expectedExceptionClass); diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/BasicSnsClientProviderTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/BasicSnsClientProviderTest.java deleted file mode 100644 index 76665a4af7cf..000000000000 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/BasicSnsClientProviderTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.sns; - -import static org.junit.Assert.assertEquals; - -import org.apache.beam.sdk.util.SerializableUtils; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; - -/** Tests on {@link BasicSnsClientProvider}. */ -@RunWith(JUnit4.class) -public class BasicSnsClientProviderTest { - - @Test - public void testSerialization() { - AwsCredentialsProvider awsCredentialsProvider = - StaticCredentialsProvider.create( - AwsBasicCredentials.create("ACCESS_KEY_ID", "SECRET_ACCESS_KEY")); - - BasicSnsClientProvider snsClientProvider = - new BasicSnsClientProvider(awsCredentialsProvider, "us-east-1", null); - - byte[] serializedBytes = SerializableUtils.serializeToByteArray(snsClientProvider); - - BasicSnsClientProvider snsClientProviderDeserialized = - (BasicSnsClientProvider) - SerializableUtils.deserializeFromByteArray(serializedBytes, "Aws Credentials Provider"); - - assertEquals(snsClientProvider, snsClientProviderDeserialized); - } -} diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOTest.java index c61aa94c016c..1eb40569577b 100644 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOTest.java +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.aws2.sns; -import static java.util.function.Function.identity; import static org.apache.beam.sdk.io.aws2.sns.PublishResponseCoders.defaultPublishResponse; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -28,16 +27,11 @@ import static org.mockito.Mockito.when; import java.io.Serializable; -import java.net.URI; import java.util.List; import java.util.function.Consumer; -import java.util.function.Function; import org.apache.beam.sdk.coders.DelegateCoder; import org.apache.beam.sdk.coders.DelegateCoder.CodingFunction; import org.apache.beam.sdk.io.aws2.MockClientBuilderFactory; -import org.apache.beam.sdk.io.aws2.StaticSupplier; -import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; -import org.apache.beam.sdk.io.aws2.common.RetryConfiguration; import org.apache.beam.sdk.io.aws2.sns.SnsIO.Write; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -52,9 +46,6 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; -import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sns.SnsClient; import software.amazon.awssdk.services.sns.SnsClientBuilder; import software.amazon.awssdk.services.sns.model.InvalidParameterException; @@ -77,16 +68,6 @@ public void configureClientBuilderFactory() { @Test public void testFailOnTopicValidation() { - failOnTopicValidation(identity()); - } - - @Test - public void testFailOnTopicValidationWithLegacyProvider() { - MockClientBuilderFactory.set(p, SnsClientBuilder.class, null); - failOnTopicValidation(write -> write.withSnsClientProvider(MockProvider.of(sns))); - } - - private void failOnTopicValidation(Function, Write> fn) { PCollection input = mock(PCollection.class); when(input.getPipeline()).thenReturn(p); when(sns.getTopicAttributes(any(Consumer.class))) @@ -97,7 +78,7 @@ private void failOnTopicValidation(Function, Write> fn) { .withTopicArn(topicArn) .withPublishRequestBuilder(msg -> requestBuilder(msg, "ignore")); - assertThatThrownBy(() -> fn.apply(snsWrite).expand(input)) + assertThatThrownBy(() -> snsWrite.expand(input)) .hasMessage("Topic arn " + topicArn + " does not exist"); } @@ -116,16 +97,6 @@ public void testSkipTopicValidation() { @Test public void testWriteWithTopicArn() { - writeWithTopicArn(identity()); - } - - @Test - public void testWriteWithTopicArnWithLegacyProvider() { - MockClientBuilderFactory.set(p, SnsClientBuilder.class, null); - writeWithTopicArn(write -> write.withSnsClientProvider(MockProvider.of(sns))); - } - - private void writeWithTopicArn(Function, Write> fn) { List input = ImmutableList.of("message1", "message2"); when(sns.publish(any(PublishRequest.class))) @@ -136,7 +107,7 @@ private void writeWithTopicArn(Function, Write> fn) { .withTopicArn(topicArn) .withPublishRequestBuilder(msg -> requestBuilder(msg, "ignore")); - PCollection results = p.apply(Create.of(input)).apply(fn.apply(snsWrite)); + PCollection results = p.apply(Create.of(input)).apply(snsWrite); PAssert.that(results.apply(Count.globally())).containsInAnyOrder(2L); p.run(); @@ -148,16 +119,6 @@ private void writeWithTopicArn(Function, Write> fn) { @Test public void testWriteWithoutTopicArn() { - writeWithoutTopicArn(identity()); - } - - @Test - public void testWriteWithoutTopicArnWithLegacyProvider() { - MockClientBuilderFactory.set(p, SnsClientBuilder.class, null); - writeWithoutTopicArn(write -> write.withSnsClientProvider(MockProvider.of(sns))); - } - - private void writeWithoutTopicArn(Function, Write> fn) { List input = ImmutableList.of("message1", "message2"); when(sns.publish(any(PublishRequest.class))) @@ -166,7 +127,7 @@ private void writeWithoutTopicArn(Function, Write> fn) { Write snsWrite = SnsIO.write().withPublishRequestBuilder(msg -> requestBuilder(msg, topicArn)); - PCollection results = p.apply(Create.of(input)).apply(fn.apply(snsWrite)); + PCollection results = p.apply(Create.of(input)).apply(snsWrite); PAssert.that(results.apply(Count.globally())).containsInAnyOrder(2L); p.run(); @@ -178,50 +139,6 @@ private void writeWithoutTopicArn(Function, Write> fn) { @Test public void testWriteWithCustomCoder() { - writeWithCustomCoder(identity()); - } - - @Test - public void testWriteWithCustomCoderWithLegacyProvider() { - MockClientBuilderFactory.set(p, SnsClientBuilder.class, null); - writeWithCustomCoder(write -> write.withSnsClientProvider(MockProvider.of(sns))); - } - - @Test - public void testBuildWithCredentialsProviderAndRegion() { - Region region = Region.US_EAST_1; - AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); - - Write write = SnsIO.write().withSnsClientProvider(credentialsProvider, region.id()); - assertThat(write.getClientConfiguration()) - .isEqualTo(ClientConfiguration.create(credentialsProvider, region, null)); - } - - @Test - public void testBuildWithRetryConfig() { - // sum up user level and sdk level retries - Write write = - SnsIO.write().withRetryConfiguration(SnsIO.RetryConfiguration.create(3, null)); - assertThat(write.getClientConfiguration()) - .isEqualTo( - ClientConfiguration.builder() - .retry(RetryConfiguration.builder().numRetries(8).build()) - .build()); - } - - @Test - public void testBuildWithCredentialsProviderAndRegionAndEndpoint() { - Region region = Region.US_EAST_1; - AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); - URI endpoint = URI.create("localhost:9999"); - - Write write = - SnsIO.write().withSnsClientProvider(credentialsProvider, region.id(), endpoint); - assertThat(write.getClientConfiguration()) - .isEqualTo(ClientConfiguration.create(credentialsProvider, region, endpoint)); - } - - private void writeWithCustomCoder(Function, Write> fn) { List input = ImmutableList.of("message1"); when(sns.publish(any(PublishRequest.class))) @@ -235,7 +152,7 @@ private void writeWithCustomCoder(Function, Write> fn) { .withPublishRequestBuilder(msg -> requestBuilder(msg, topicArn)) .withCoder(DelegateCoder.of(defaultPublishResponse(), countingFn, x -> x)); - PCollection results = p.apply(Create.of(input)).apply(fn.apply(snsWrite)); + PCollection results = p.apply(Create.of(input)).apply(snsWrite); PAssert.that(results.apply(Count.globally())).containsInAnyOrder(1L); p.run(); @@ -258,16 +175,4 @@ public T apply(T input) throws Exception { private static PublishRequest.Builder requestBuilder(String msg, String topic) { return PublishRequest.builder().message(msg).topicArn(topic); } - - private static class MockProvider extends StaticSupplier - implements SnsClientProvider { - static SnsClientProvider of(SnsClient client) { - return new MockProvider().withObject(client); - } - - @Override - public SnsClient getSnsClient() { - return get(); - } - } } diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/BasicSqsClientProviderTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/BasicSqsClientProviderTest.java deleted file mode 100644 index 0970f9de0d5c..000000000000 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/BasicSqsClientProviderTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.sqs; - -import static org.junit.Assert.assertEquals; - -import org.apache.beam.sdk.util.SerializableUtils; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; - -/** Tests on {@link BasicSqsClientProvider}. */ -@RunWith(JUnit4.class) -public class BasicSqsClientProviderTest { - - @Test - public void testSerialization() { - AwsCredentialsProvider awsCredentialsProvider = - StaticCredentialsProvider.create( - AwsBasicCredentials.create("ACCESS_KEY_ID", "SECRET_ACCESS_KEY")); - - BasicSqsClientProvider sqsClientProvider = - new BasicSqsClientProvider(awsCredentialsProvider, "us-east-1", null); - - byte[] serializedBytes = SerializableUtils.serializeToByteArray(sqsClientProvider); - - BasicSqsClientProvider sqsClientProviderDeserialized = - (BasicSqsClientProvider) - SerializableUtils.deserializeFromByteArray(serializedBytes, "Aws Credentials Provider"); - - assertEquals(sqsClientProvider, sqsClientProviderDeserialized); - } -} diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOReadTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOReadTest.java index 4ef39c62c9b3..1983553c8e5b 100644 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOReadTest.java +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOReadTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.aws2.sqs; -import static java.util.function.Function.identity; import static java.util.stream.Collectors.toList; import static java.util.stream.IntStream.range; import static org.assertj.core.api.Assertions.assertThat; @@ -26,12 +25,9 @@ import static software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName.SENT_TIMESTAMP; import static software.amazon.awssdk.services.sqs.model.QueueAttributeName.VISIBILITY_TIMEOUT; -import java.net.URI; import java.util.List; import java.util.function.Consumer; -import java.util.function.Function; import org.apache.beam.sdk.io.aws2.MockClientBuilderFactory; -import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; import org.apache.beam.sdk.io.aws2.sqs.SqsIO.Read; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -47,9 +43,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; -import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.SqsClientBuilder; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; @@ -78,16 +71,6 @@ public void configureClientBuilderFactory() { @Test public void testReadOnce() { - readOnce(identity()); - } - - @Test - public void testReadOnceWithLegacyProvider() { - MockClientBuilderFactory.set(p, SqsClientBuilder.class, null); - readOnce(read -> read.withSqsClientProvider(StaticSqsClientProvider.of(sqs))); - } - - private void readOnce(Function fn) { List expected = range(0, 10).mapToObj(this::message).collect(toList()); when(sqs.receiveMessage(any(ReceiveMessageRequest.class))) @@ -101,8 +84,7 @@ private void readOnce(Function fn) { .thenReturn(DeleteMessageBatchResponse.builder().build()); PCollection result = - p.apply(fn.apply(SqsIO.read().withMaxNumRecords(expected.size()))) - .apply(ParDo.of(new ToMessage())); + p.apply(SqsIO.read().withMaxNumRecords(expected.size())).apply(ParDo.of(new ToMessage())); // all expected messages are read PAssert.that(result).containsInAnyOrder(expected); @@ -119,27 +101,6 @@ private void readOnce(Function fn) { .containsExactlyInAnyOrderElementsOf(Lists.transform(expected, Message::receiptHandle)); } - @Test - public void testBuildWithCredentialsProviderAndRegion() { - Region region = Region.US_EAST_1; - AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); - - Read read = SqsIO.read().withSqsClientProvider(credentialsProvider, region.id()); - assertThat(read.clientConfiguration()) - .isEqualTo(ClientConfiguration.create(credentialsProvider, region, null)); - } - - @Test - public void testBuildWithCredentialsProviderAndRegionAndEndpoint() { - Region region = Region.US_EAST_1; - AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); - URI endpoint = URI.create("localhost:9999"); - - Read read = SqsIO.read().withSqsClientProvider(credentialsProvider, region.id(), endpoint); - assertThat(read.clientConfiguration()) - .isEqualTo(ClientConfiguration.create(credentialsProvider, region, endpoint)); - } - private Message message(int i) { return Message.builder() .messageId("id" + i) diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteTest.java index 0d3f3fc7f04a..738cf282adf6 100644 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteTest.java +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteTest.java @@ -17,20 +17,14 @@ */ package org.apache.beam.sdk.io.aws2.sqs; -import static java.util.function.Function.identity; import static java.util.stream.IntStream.range; -import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.net.URI; import java.util.List; -import java.util.function.Function; import java.util.stream.Collectors; import org.apache.beam.sdk.io.aws2.MockClientBuilderFactory; -import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; -import org.apache.beam.sdk.io.aws2.sqs.SqsIO.Write; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.junit.Before; @@ -39,9 +33,6 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; -import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.SqsClientBuilder; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; @@ -60,16 +51,6 @@ public void configureClientBuilderFactory() { @Test public void testWrite() { - write(identity()); - } - - @Test - public void testWriteWithLegacyProvider() { - MockClientBuilderFactory.set(p, SqsClientBuilder.class, null); - write(write -> write.withSqsClientProvider(StaticSqsClientProvider.of(sqs))); - } - - private void write(Function fn) { when(sqs.sendMessage(any(SendMessageRequest.class))) .thenReturn(SendMessageResponse.builder().build()); @@ -79,30 +60,9 @@ private void write(Function fn) { .mapToObj(i -> builder.messageBody("test" + i).build()) .collect(Collectors.toList()); - p.apply(Create.of(messages)).apply(fn.apply(SqsIO.write())); + p.apply(Create.of(messages)).apply(SqsIO.write()); p.run().waitUntilFinish(); messages.forEach(msg -> verify(sqs).sendMessage(msg)); } - - @Test - public void testBuildWithCredentialsProviderAndRegion() { - Region region = Region.US_EAST_1; - AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); - - Write write = SqsIO.write().withSqsClientProvider(credentialsProvider, region.id()); - assertThat(write.getClientConfiguration()) - .isEqualTo(ClientConfiguration.create(credentialsProvider, region, null)); - } - - @Test - public void testBuildWithCredentialsProviderAndRegionAndEndpoint() { - Region region = Region.US_EAST_1; - AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); - URI endpoint = URI.create("localhost:9999"); - - Write write = SqsIO.write().withSqsClientProvider(credentialsProvider, region.id(), endpoint); - assertThat(write.getClientConfiguration()) - .isEqualTo(ClientConfiguration.create(credentialsProvider, region, endpoint)); - } } diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReaderTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReaderTest.java index 5b1ac0cbb28b..8d3adb077a82 100644 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReaderTest.java +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReaderTest.java @@ -33,8 +33,10 @@ import java.util.List; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.aws2.MockClientBuilderFactory; import org.apache.beam.sdk.io.aws2.options.AwsOptions; import org.apache.beam.sdk.io.aws2.sqs.EmbeddedSqsServer.TestCaseEnv; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.util.CoderUtils; import org.junit.ClassRule; import org.junit.Rule; @@ -43,6 +45,7 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.SqsClientBuilder; /** Tests on {@link SqsUnboundedReader}. */ @RunWith(MockitoJUnitRunner.StrictStubs.class) @@ -56,7 +59,7 @@ public class SqsUnboundedReaderTest { @Mock(answer = RETURNS_DEEP_STUBS) public SqsUnboundedSource mockSource; - @Mock public AwsOptions options; + private AwsOptions options = PipelineOptionsFactory.create().as(AwsOptions.class); private void setupMessages(String... messages) { final SqsClient client = testCase.getClient(); @@ -65,7 +68,7 @@ private void setupMessages(String... messages) { client.sendMessage(b -> b.queueUrl(queueUrl).messageBody(message)); } - when(mockSource.getRead().sqsClientProvider()).thenReturn(StaticSqsClientProvider.of(client)); + MockClientBuilderFactory.set(options, SqsClientBuilder.class, client); when(mockSource.getRead().queueUrl()).thenReturn(queueUrl); } diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/StaticSqsClientProvider.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/StaticSqsClientProvider.java deleted file mode 100644 index 78d5409fd052..000000000000 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/StaticSqsClientProvider.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.sqs; - -import org.apache.beam.sdk.io.aws2.StaticSupplier; -import software.amazon.awssdk.services.sqs.SqsClient; - -/** Client provider supporting unserializable clients such as mock instances for unit tests. */ -public class StaticSqsClientProvider extends StaticSupplier - implements SqsClientProvider { - - public static StaticSqsClientProvider of(SqsClient sqs) { - return new StaticSqsClientProvider().withObject(sqs); - } - - @Override - public SqsClient getSqsClient() { - return get(); - } -}