Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enable instream retry for default streams when Multiplexing. #2376

Merged
merged 2 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public class ConnectionWorkerPool {
*/
private final java.time.Duration maxRetryDuration;

/*
* Retry settings for in-stream retries.
*/
private RetrySettings retrySettings;

/*
Expand Down Expand Up @@ -208,7 +211,8 @@ public abstract static class Builder {
FlowController.LimitExceededBehavior limitExceededBehavior,
String traceId,
@Nullable String comperssorName,
BigQueryWriteSettings clientSettings) {
BigQueryWriteSettings clientSettings,
RetrySettings retrySettings) {
this.maxInflightRequests = maxInflightRequests;
this.maxInflightBytes = maxInflightBytes;
this.maxRetryDuration = maxRetryDuration;
Expand All @@ -217,8 +221,7 @@ public abstract static class Builder {
this.compressorName = comperssorName;
this.clientSettings = clientSettings;
this.currentMaxConnectionCount = settings.minConnectionsPerRegion();
// In-stream retry is not enabled for multiplexing.
this.retrySettings = null;
this.retrySettings = retrySettings;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,6 @@ private StreamWriter(Builder builder) throws IOException {
"Trying to enable connection pool in non-default stream.");
}

if (builder.retrySettings != null) {
log.warning("Retry settings is only allowed when connection pool is not enabled.");
throw new IllegalArgumentException(
"Trying to enable connection pool while providing retry settings.");
}

// We need a client to perform some getWriteStream calls.
BigQueryWriteClient client =
builder.client != null ? builder.client : new BigQueryWriteClient(clientSettings);
Expand Down Expand Up @@ -295,7 +289,8 @@ private StreamWriter(Builder builder) throws IOException {
builder.limitExceededBehavior,
builder.traceId,
builder.compressorName,
client.getSettings());
client.getSettings(),
builder.retrySettings);
}));
validateFetchedConnectonPool(builder);
// If the client is not from outside, then shutdown the client we created.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings;
import com.google.common.util.concurrent.ListeningExecutorService;
Expand Down Expand Up @@ -58,6 +59,17 @@ public class ConnectionWorkerPoolTest {
private static final String TEST_TRACE_ID = "DATAFLOW:job_id";
private static final String TEST_STREAM_1 = "projects/p1/datasets/d1/tables/t1/streams/_default";
private static final String TEST_STREAM_2 = "projects/p1/datasets/d1/tables/t2/streams/_default";
private static final int MAX_RETRY_NUM_ATTEMPTS = 3;
private static final long INITIAL_RETRY_MILLIS = 500;
private static final double RETRY_MULTIPLIER = 1.3;
private static final int MAX_RETRY_DELAY_MINUTES = 5;
private static final RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(INITIAL_RETRY_MILLIS))
.setRetryDelayMultiplier(RETRY_MULTIPLIER)
.setMaxAttempts(MAX_RETRY_NUM_ATTEMPTS)
.setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(MAX_RETRY_DELAY_MINUTES))
.build();

@Before
public void setUp() throws Exception {
Expand Down Expand Up @@ -398,6 +410,7 @@ public void testCloseExternalClient()
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.setLocation("us")
.setRetrySettings(retrySettings)
.build());
}

Expand Down Expand Up @@ -483,6 +496,7 @@ ConnectionWorkerPool createConnectionWorkerPool(
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
null,
clientSettings);
clientSettings,
retrySettings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -536,25 +536,6 @@ public void testShortenStreamNameAllowed() throws Exception {
.build();
}

@Test
public void testNoRetryWhenConnectionPoolEnabled() throws Exception {
IllegalArgumentException ex =
assertThrows(
IllegalArgumentException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
StreamWriter.newBuilder(TEST_STREAM_SHORTEN, client)
.setEnableConnectionPool(true)
.setRetrySettings(RetrySettings.newBuilder().build())
.build();
}
});
assertTrue(
ex.getMessage()
.contains("Trying to enable connection pool while providing retry settings."));
}

@Test
public void testAppendSuccessAndConnectionError() throws Exception {
StreamWriter writer =
Expand Down Expand Up @@ -1429,6 +1410,7 @@ public StreamWriter getMultiplexingStreamWriter(String streamName) throws IOExce
.setMaxInflightRequests(10)
.setLocation("US")
.setMaxRetryDuration(java.time.Duration.ofMillis(100))
.setRetrySettings(retrySettings)
.build();
}

Expand Down