From 827cdfa174814f95781beebafed206730a68e775 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Tue, 4 Sep 2018 16:01:29 -0400 Subject: [PATCH 1/3] Bigtable: improve list tables spooler Avoid blocking the event loop. Previously the first page would be fetched asynchronously, but all of the other pages would be fetched synchronously which would block the grpc event loop. The new implementation uses future chaining. --- .../admin/v2/BigtableTableAdminClient.java | 70 ++++++++++++++++--- .../v2/BigtableTableAdminClientTest.java | 59 ++++++++++------ 2 files changed, 100 insertions(+), 29 deletions(-) diff --git a/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java b/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java index 16b70188afdd..88068e637c3d 100644 --- a/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java +++ b/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java @@ -15,7 +15,10 @@ */ package com.google.cloud.bigtable.admin.v2; +import com.google.api.core.ApiAsyncFunction; +import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPage; import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPagedResponse; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; import java.io.IOException; import java.util.ArrayList; @@ -32,7 +35,6 @@ import com.google.bigtable.admin.v2.GetTableRequest; import com.google.bigtable.admin.v2.InstanceName; import com.google.bigtable.admin.v2.ListTablesRequest; -import com.google.bigtable.admin.v2.ListTablesResponse; import com.google.bigtable.admin.v2.TableName; import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest; import com.google.cloud.bigtable.admin.v2.models.ModifyColumnFamiliesRequest; @@ -327,18 +329,68 @@ public List listTables() { * } */ public ApiFuture> listTablesAsync() { - ApiFuture listResp = - this.stub.listTablesPagedCallable().futureCall(composeListTableRequest()); + ListTablesRequest request = ListTablesRequest.newBuilder().setParent(instanceName.toString()) + .build(); - return ApiFutures.transform( - listResp, - new ApiFunction>() { + // TODO(igorbernstein2): try to upstream pagination spooling or figure out a way to expose the + // paginated responses while maintaining the wrapper facade. + + // Fetch the first page. + ApiFuture firstPageFuture = ApiFutures.transform( + stub.listTablesPagedCallable().futureCall(request), + new ApiFunction() { @Override - public List apply(ListTablesPagedResponse input) { - return convertToTableNames(input.iterateAll()); + public ListTablesPage apply(ListTablesPagedResponse response) { + return response.getPage(); } }, - MoreExecutors.directExecutor()); + MoreExecutors.directExecutor() + ); + + // Fetch the rest of the pages by chaining the futures. + ApiFuture> allProtos = ApiFutures + .transformAsync( + firstPageFuture, + new ApiAsyncFunction>() { + List responseAccumulator = Lists + .newArrayList(); + + @Override + public ApiFuture> apply( + ListTablesPage page) { + // Add all entries from the page + responseAccumulator.addAll(Lists.newArrayList(page.getValues())); + + // If this is the last page, just return the accumulated responses. + if (!page.hasNextPage()) { + return ApiFutures.immediateFuture(responseAccumulator); + } + + // Otherwise fetch the next page. + return ApiFutures.transformAsync( + page.getNextPageAsync(), + this, + MoreExecutors.directExecutor() + ); + } + }, + MoreExecutors.directExecutor() + ); + + // Wrap all of the accumulated protos. + return ApiFutures.transform(allProtos, + new ApiFunction, List>() { + @Override + public List apply(List protos) { + List results = Lists.newArrayListWithCapacity(protos.size()); + for (com.google.bigtable.admin.v2.Table proto : protos) { + results.add(TableName.parse(proto.getName())); + } + return results; + } + }, + MoreExecutors.directExecutor() + ); } /** diff --git a/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTest.java b/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTest.java index 7ef43120597c..85f8753eca74 100644 --- a/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTest.java +++ b/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTest.java @@ -33,6 +33,7 @@ import com.google.bigtable.admin.v2.ListTablesRequest; import com.google.bigtable.admin.v2.ModifyColumnFamiliesRequest.Modification; import com.google.bigtable.admin.v2.TableName; +import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPage; import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPagedResponse; import com.google.cloud.bigtable.admin.v2.models.ConsistencyToken; import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest; @@ -327,34 +328,52 @@ public void testGetTableAsync() throws Exception { @Test public void testListTables() { // Setup - ListTablesRequest expectedRequest = ListTablesRequest.newBuilder() - .setParent(INSTANCE_NAME.toString()) - .build(); - - ListTablesPagedResponse expectedResponseWrapper = Mockito.mock(ListTablesPagedResponse.class); + com.google.bigtable.admin.v2.ListTablesRequest expectedRequest = + com.google.bigtable.admin.v2.ListTablesRequest.newBuilder() + .setParent(INSTANCE_NAME.toString()) + .build(); - Iterable expectedResults = Lists.newArrayList( - com.google.bigtable.admin.v2.Table.newBuilder() - .setName(TABLE_NAME.toString() + "1") - .build(), - com.google.bigtable.admin.v2.Table.newBuilder() - .setName(TABLE_NAME.toString() + "2") - .build()); + // 3 Tables spread across 2 pages + List expectedProtos = Lists.newArrayList(); + for (int i = 0; i < 3; i++) { + expectedProtos.add( + com.google.bigtable.admin.v2.Table.newBuilder() + .setName(TABLE_NAME.toString() + i) + .build()); + } + // 2 on the first page + ListTablesPage page0 = Mockito.mock(ListTablesPage.class); + Mockito.when(page0.getValues()).thenReturn(expectedProtos.subList(0, 2)); + Mockito.when(page0.getNextPageToken()).thenReturn("next-page"); + Mockito.when(page0.hasNextPage()).thenReturn(true); + + // 1 on the last page + ListTablesPage page1 = Mockito.mock(ListTablesPage.class); + Mockito.when(page1.getValues()).thenReturn(expectedProtos.subList(2, 3)); + + // Link page0 to page1 + Mockito.when(page0.getNextPageAsync()).thenReturn( + ApiFutures.immediateFuture(page1) + ); - Mockito.when(mockListTableCallable.futureCall(expectedRequest)) - .thenReturn(ApiFutures.immediateFuture(expectedResponseWrapper)); + // Link page to the response + ListTablesPagedResponse response0 = Mockito.mock(ListTablesPagedResponse.class); + Mockito.when(response0.getPage()).thenReturn(page0); - Mockito.when(expectedResponseWrapper.iterateAll()) - .thenReturn(expectedResults); + Mockito.when(mockListTableCallable.futureCall(expectedRequest)).thenReturn( + ApiFutures.immediateFuture(response0) + ); // Execute List actualResults = adminClient.listTables(); // Verify - assertThat(actualResults).containsExactly( - TableName.parse(TABLE_NAME.toString() + "1"), - TableName.parse(TABLE_NAME.toString() + "2") - ); + List expectedResults = Lists.newArrayList(); + for (com.google.bigtable.admin.v2.Table expectedProto : expectedProtos) { + expectedResults.add(TableName.parse(expectedProto.getName())); + } + + assertThat(actualResults).containsExactlyElementsIn(expectedResults); } @Test From c4810004c507fcd73aada30134e67659c781f0db Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Tue, 4 Sep 2018 17:24:15 -0400 Subject: [PATCH 2/3] update async test as well --- .../v2/BigtableTableAdminClientTest.java | 56 ++++++++++++------- 1 file changed, 37 insertions(+), 19 deletions(-) diff --git a/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTest.java b/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTest.java index 85f8753eca74..264d2922201f 100644 --- a/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTest.java +++ b/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTest.java @@ -379,34 +379,52 @@ public void testListTables() { @Test public void testListTablesAsync() throws Exception { // Setup - ListTablesRequest expectedRequest = ListTablesRequest.newBuilder() - .setParent(INSTANCE_NAME.toString()) - .build(); + com.google.bigtable.admin.v2.ListTablesRequest expectedRequest = + com.google.bigtable.admin.v2.ListTablesRequest.newBuilder() + .setParent(INSTANCE_NAME.toString()) + .build(); - ListTablesPagedResponse expectedResponseWrapper = Mockito.mock(ListTablesPagedResponse.class); + // 3 Tables spread across 2 pages + List expectedProtos = Lists.newArrayList(); + for (int i = 0; i < 3; i++) { + expectedProtos.add( + com.google.bigtable.admin.v2.Table.newBuilder() + .setName(TABLE_NAME.toString() + i) + .build()); + } + // 2 on the first page + ListTablesPage page0 = Mockito.mock(ListTablesPage.class); + Mockito.when(page0.getValues()).thenReturn(expectedProtos.subList(0, 2)); + Mockito.when(page0.getNextPageToken()).thenReturn("next-page"); + Mockito.when(page0.hasNextPage()).thenReturn(true); - Iterable expectedResults = Lists.newArrayList( - com.google.bigtable.admin.v2.Table.newBuilder() - .setName(TABLE_NAME.toString() + "1") - .build(), - com.google.bigtable.admin.v2.Table.newBuilder() - .setName(TABLE_NAME.toString() + "2") - .build()); + // 1 on the last page + ListTablesPage page1 = Mockito.mock(ListTablesPage.class); + Mockito.when(page1.getValues()).thenReturn(expectedProtos.subList(2, 3)); + + // Link page0 to page1 + Mockito.when(page0.getNextPageAsync()).thenReturn( + ApiFutures.immediateFuture(page1) + ); - Mockito.when(mockListTableCallable.futureCall(expectedRequest)) - .thenReturn(ApiFutures.immediateFuture(expectedResponseWrapper)); + // Link page to the response + ListTablesPagedResponse response0 = Mockito.mock(ListTablesPagedResponse.class); + Mockito.when(response0.getPage()).thenReturn(page0); - Mockito.when(expectedResponseWrapper.iterateAll()) - .thenReturn(expectedResults); + Mockito.when(mockListTableCallable.futureCall(expectedRequest)).thenReturn( + ApiFutures.immediateFuture(response0) + ); // Execute ApiFuture> actualResults = adminClient.listTablesAsync(); // Verify - assertThat(actualResults.get()).containsExactly( - TableName.parse(TABLE_NAME.toString() + "1"), - TableName.parse(TABLE_NAME.toString() + "2") - ); + List expectedResults = Lists.newArrayList(); + for (com.google.bigtable.admin.v2.Table expectedProto : expectedProtos) { + expectedResults.add(TableName.parse(expectedProto.getName())); + } + + assertThat(actualResults.get()).containsExactlyElementsIn(expectedResults); } @Test From 3535bdc578c82f647d46a214b30b511d1d241014 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Thu, 6 Sep 2018 13:42:48 -0400 Subject: [PATCH 3/3] reformat --- .../admin/v2/BigtableTableAdminClient.java | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java b/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java index 88068e637c3d..7f7ecdf71dcb 100644 --- a/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java +++ b/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java @@ -101,21 +101,25 @@ public final class BigtableTableAdminClient implements AutoCloseable { private final InstanceName instanceName; /** Constructs an instance of BigtableTableAdminClient with the given instanceName. */ - public static BigtableTableAdminClient create(@Nonnull InstanceName instanceName) throws IOException { + public static BigtableTableAdminClient create(@Nonnull InstanceName instanceName) + throws IOException { return create(BigtableTableAdminSettings.newBuilder().setInstanceName(instanceName).build()); } /** Constructs an instance of BigtableTableAdminClient with the given settings. */ - public static BigtableTableAdminClient create(@Nonnull BigtableTableAdminSettings settings) throws IOException { + public static BigtableTableAdminClient create(@Nonnull BigtableTableAdminSettings settings) + throws IOException { return create(settings.getInstanceName(), settings.getStubSettings().createStub()); } /** Constructs an instance of BigtableTableAdminClient with the given instanceName and stub. */ - public static BigtableTableAdminClient create(@Nonnull InstanceName instanceName, @Nonnull BigtableTableAdminStub stub) { + public static BigtableTableAdminClient create(@Nonnull InstanceName instanceName, + @Nonnull BigtableTableAdminStub stub) { return new BigtableTableAdminClient(instanceName, stub); } - private BigtableTableAdminClient(@Nonnull InstanceName instanceName, @Nonnull BigtableTableAdminStub stub) { + private BigtableTableAdminClient(@Nonnull InstanceName instanceName, + @Nonnull BigtableTableAdminStub stub) { Preconditions.checkNotNull(instanceName); Preconditions.checkNotNull(stub); this.instanceName = instanceName; @@ -528,7 +532,8 @@ public ApiFuture generateConsistencyTokenAsync(final String ta new ApiFunction() { @Override public ConsistencyToken apply(GenerateConsistencyTokenResponse proto) { - TableName tableName = TableName.of(instanceName.getProject(), instanceName.getInstance(), tableId); + TableName tableName = TableName + .of(instanceName.getProject(), instanceName.getInstance(), tableId); return ConsistencyToken.of(tableName, proto.getConsistencyToken()); } }, @@ -576,8 +581,7 @@ public Boolean apply(CheckConsistencyResponse input) { // TODO(igorbernstein2): add awaitConsist() & awaitConsistAsync() that generate & poll a token /** - * Helper method to construct the table name in format: - * projects/{project}/instances/{instance}/tables/{tableId} + * Helper method to construct the table name in format: projects/{project}/instances/{instance}/tables/{tableId} */ @VisibleForTesting String getTableName(String tableId) { @@ -636,7 +640,8 @@ GenerateConsistencyTokenRequest composeGenerateConsistencyTokenRequest(String ta * Helper method to convert ListTablesResponse to List */ @VisibleForTesting - static List convertToTableNames(Iterable listTablesResponse) { + static List convertToTableNames( + Iterable listTablesResponse) { List tableNames = new ArrayList<>(); for (com.google.bigtable.admin.v2.Table table : listTablesResponse) { @@ -646,6 +651,7 @@ static List convertToTableNames(Iterable to ApiFuture */ @@ -680,7 +686,7 @@ public Void apply(Empty empty) { private T awaitFuture(ApiFuture future) { try { return future.get(); - } catch(Throwable t) { + } catch (Throwable t) { // TODO(igorbernstein2): figure out a better wrapper exception. throw new RuntimeException(t); }