Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove deprecated AWS 2 client providers #26682

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@

* Passing a tag into MultiProcessShared is now required in the Python SDK ([#26168](https://github.com/apache/beam/issues/26168)).
* CloudDebuggerOptions is removed (deprecated in Beam v2.47.0) for Dataflow runner as the Google Cloud Debugger service is [shutting down](https://cloud.google.com/debugger/docs/deprecations). (Java) ([#25959](https://github.com/apache/beam/issues/25959)).
* AWS 2 client providers (deprecated in Beam [v2.38.0](#2380---2022-04-20)) are finally removed ([#26681](https://github.com/apache/beam/issues/26681)).

## Deprecations

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,15 @@
import static java.util.stream.Collectors.toList;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
Expand All @@ -43,7 +39,6 @@
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory;
import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
import org.apache.beam.sdk.io.aws2.common.RetryConfiguration;
import org.apache.beam.sdk.io.aws2.options.AwsOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
Expand Down Expand Up @@ -169,9 +164,7 @@ public static <T> Write<T> write() {
@AutoValue
public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {

abstract @Nullable ClientConfiguration getClientConfiguration();

abstract @Nullable DynamoDbClientProvider getDynamoDbClientProvider();
abstract ClientConfiguration getClientConfiguration();

abstract @Nullable SerializableFunction<Void, ScanRequest> getScanRequestFn();

Expand All @@ -188,8 +181,6 @@ abstract static class Builder<T> {

abstract Builder<T> setClientConfiguration(ClientConfiguration config);

abstract Builder<T> setDynamoDbClientProvider(DynamoDbClientProvider dynamoDbClientProvider);

abstract Builder<T> setScanRequestFn(SerializableFunction<Void, ScanRequest> fn);

abstract Builder<T> setSegmentId(Integer segmentId);
Expand All @@ -201,49 +192,8 @@ abstract static class Builder<T> {
abstract Read<T> build();
}

/**
* @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} instead. Alternatively
* you can configure a custom {@link ClientBuilderFactory} in {@link AwsOptions}.
*/
@Deprecated
public Read<T> withDynamoDbClientProvider(DynamoDbClientProvider clientProvider) {
checkArgument(clientProvider != null, "DynamoDbClientProvider cannot be null");
return toBuilder()
.setClientConfiguration(null)
.setDynamoDbClientProvider(clientProvider)
.build();
}

/** @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} instead. */
@Deprecated
public Read<T> withDynamoDbClientProvider(
AwsCredentialsProvider credentials, String region, URI endpoint) {
return updateClientConfig(
b ->
b.credentialsProvider(credentials)
.region(Region.of(region))
.endpoint(endpoint)
.build());
}

/** @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} instead. */
@Deprecated
public Read<T> withDynamoDbClientProvider(AwsCredentialsProvider credentials, String region) {
return updateClientConfig(
b -> b.credentialsProvider(credentials).region(Region.of(region)).build());
}

/** Configuration of DynamoDB client. */
public Read<T> withClientConfiguration(ClientConfiguration config) {
return updateClientConfig(ignore -> config);
}

private Read<T> updateClientConfig(
Function<ClientConfiguration.Builder, ClientConfiguration> fn) {
checkState(
getDynamoDbClientProvider() == null,
"Legacy DynamoDbClientProvider is set, but incompatible with ClientConfiguration.");
ClientConfiguration config = fn.apply(getClientConfiguration().toBuilder());
checkArgument(config != null, "ClientConfiguration cannot be null");
return toBuilder().setClientConfiguration(config).build();
}
Expand Down Expand Up @@ -288,11 +238,8 @@ public PCollection<T> expand(PBegin input) {
(scanRequest.totalSegments() != null && scanRequest.totalSegments() > 0),
"TotalSegments is required with withScanRequestFn() and greater zero");

if (getDynamoDbClientProvider() == null) {
checkNotNull(getClientConfiguration(), "clientConfiguration cannot be null");
AwsOptions awsOptions = input.getPipeline().getOptions().as(AwsOptions.class);
ClientBuilderFactory.validate(awsOptions, getClientConfiguration());
}
AwsOptions awsOptions = input.getPipeline().getOptions().as(AwsOptions.class);
ClientBuilderFactory.validate(awsOptions, getClientConfiguration());

PCollection<Read<T>> splits =
input.apply("Create", Create.of(this)).apply("Split", ParDo.of(new SplitFn<>()));
Expand Down Expand Up @@ -320,10 +267,6 @@ public void processElement(@Element Read<T> spec, OutputReceiver<Read<T>> out) {
/** A {@link DoFn} executing the ScanRequest to read from DynamoDb. */
private static class ReadFn<T> extends DoFn<Read<T>, T> {
private DynamoDbClient buildClient(Read<T> spec, AwsOptions opts) {
if (spec.getDynamoDbClientProvider() != null) {
// build client using legacy DynamoDbClientProvider
return spec.getDynamoDbClientProvider().getDynamoDbClient();
}
return ClientBuilderFactory.buildClient(
opts, DynamoDbClient.builder(), spec.getClientConfiguration());
}
Expand Down Expand Up @@ -364,61 +307,11 @@ public List<Map<String, AttributeValue>> apply(@Nullable ScanResponse scanRespon
}
}

/**
* Legacy retry configuration.
*
* <p><b>Warning</b>: Max accumulative retry latency is silently ignored as it is not supported by
* the AWS SDK.
*
* @deprecated Use {@link org.apache.beam.sdk.io.aws2.common.RetryConfiguration} instead to
* delegate retries to the AWS SDK.
*/
@AutoValue
@Deprecated
public abstract static class RetryConfiguration implements Serializable {
abstract int getMaxAttempts();

abstract Duration getMaxDuration();

public static Builder builder() {
return new AutoValue_DynamoDBIO_RetryConfiguration.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setMaxAttempts(int maxAttempts);

/**
* @deprecated <b>Warning</b>, max accumulative retry latency is silently ignored as it is not
* supported by the AWS SDK.
*/
@Deprecated
public abstract Builder setMaxDuration(Duration maxDuration);

abstract RetryConfiguration autoBuild();

public RetryConfiguration build() {
RetryConfiguration config = autoBuild();
checkArgument(config.getMaxAttempts() > 0, "maxAttempts should be greater than 0");
return config;
}
}

org.apache.beam.sdk.io.aws2.common.RetryConfiguration convertLegacyConfig() {
int totalAttempts = getMaxAttempts() * 3; // 3 SDK attempts per user attempt
return org.apache.beam.sdk.io.aws2.common.RetryConfiguration.builder()
.numRetries(totalAttempts - 1)
.build();
}
}

/** Write a PCollection<T> data into DynamoDB. */
@AutoValue
public abstract static class Write<T> extends PTransform<PCollection<T>, PCollection<Void>> {

abstract @Nullable ClientConfiguration getClientConfiguration();

abstract @Nullable DynamoDbClientProvider getDynamoDbClientProvider();
abstract ClientConfiguration getClientConfiguration();

abstract @Nullable SerializableFunction<T, KV<String, WriteRequest>> getWriteItemMapperFn();

Expand All @@ -430,8 +323,6 @@ public abstract static class Write<T> extends PTransform<PCollection<T>, PCollec
abstract static class Builder<T> {
abstract Builder<T> setClientConfiguration(ClientConfiguration config);

abstract Builder<T> setDynamoDbClientProvider(DynamoDbClientProvider dynamoDbClientProvider);

abstract Builder<T> setWriteItemMapperFn(
SerializableFunction<T, KV<String, WriteRequest>> writeItemMapperFn);

Expand All @@ -440,65 +331,12 @@ abstract Builder<T> setWriteItemMapperFn(
abstract Write<T> build();
}

/**
* @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} instead. Alternatively
* you can configure a custom {@link ClientBuilderFactory} in {@link AwsOptions}.
*/
@Deprecated
public Write<T> withDynamoDbClientProvider(DynamoDbClientProvider clientProvider) {
checkArgument(clientProvider != null, "DynamoDbClientProvider cannot be null");
return toBuilder()
.setClientConfiguration(null)
.setDynamoDbClientProvider(clientProvider)
.build();
}

/** @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} instead. */
@Deprecated
public Write<T> withDynamoDbClientProvider(
AwsCredentialsProvider credentials, String region, URI endpoint) {
return updateClientConfig(
b ->
b.credentialsProvider(credentials)
.region(Region.of(region))
.endpoint(endpoint)
.build());
}

/** @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} instead. */
@Deprecated
public Write<T> withDynamoDbClientProvider(AwsCredentialsProvider credentials, String region) {
return updateClientConfig(
b -> b.credentialsProvider(credentials).region(Region.of(region)).build());
}

/** Configuration of DynamoDB client. */
public Write<T> withClientConfiguration(ClientConfiguration config) {
return updateClientConfig(ignore -> config);
}

private Write<T> updateClientConfig(
Function<ClientConfiguration.Builder, ClientConfiguration> fn) {
checkState(
getDynamoDbClientProvider() == null,
"Legacy DynamoDbClientProvider is set, but incompatible with ClientConfiguration.");
ClientConfiguration config = fn.apply(getClientConfiguration().toBuilder());
checkArgument(config != null, "ClientConfiguration cannot be null");
return toBuilder().setClientConfiguration(config).build();
}

/**
* Retry configuration of DynamoDB client.
*
* @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} with {@link
* org.apache.beam.sdk.io.aws2.common.RetryConfiguration} instead to delegate retries to the
* AWS SDK.
*/
@Deprecated
public Write<T> withRetryConfiguration(RetryConfiguration retry) {
return updateClientConfig(b -> b.retry(retry.convertLegacyConfig()).build());
}

public Write<T> withWriteRequestMapperFn(
SerializableFunction<T, KV<String, WriteRequest>> writeItemMapperFn) {
return toBuilder().setWriteItemMapperFn(writeItemMapperFn).build();
Expand All @@ -510,11 +348,9 @@ public Write<T> withDeduplicateKeys(List<String> deduplicateKeys) {

@Override
public PCollection<Void> expand(PCollection<T> input) {
if (getDynamoDbClientProvider() == null) {
checkNotNull(getClientConfiguration(), "clientConfiguration cannot be null");
AwsOptions awsOptions = input.getPipeline().getOptions().as(AwsOptions.class);
ClientBuilderFactory.validate(awsOptions, getClientConfiguration());
}
checkNotNull(getClientConfiguration(), "clientConfiguration cannot be null");
AwsOptions awsOptions = input.getPipeline().getOptions().as(AwsOptions.class);
ClientBuilderFactory.validate(awsOptions, getClientConfiguration());

return input.apply(ParDo.of(new WriteFn<>(this)));
}
Expand Down Expand Up @@ -544,14 +380,8 @@ static class WriteFn<T> extends DoFn<T, Void> {
@Setup
public void setup(PipelineOptions options) {
ClientConfiguration clientConfig = spec.getClientConfiguration();
if (spec.getDynamoDbClientProvider() != null) {
// build client using legacy DynamoDbClientProvider
client = spec.getDynamoDbClientProvider().getDynamoDbClient();
} else {
AwsOptions awsOpts = options.as(AwsOptions.class);
client =
ClientBuilderFactory.buildClient(awsOpts, DynamoDbClient.builder(), clientConfig);
}
AwsOptions awsOpts = options.as(AwsOptions.class);
client = ClientBuilderFactory.buildClient(awsOpts, DynamoDbClient.builder(), clientConfig);

// resume from partial failures
resumeBackoff = FluentBackoff.DEFAULT.withMaxRetries(BATCH_SIZE);
Expand Down
Loading