Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bigtable: add await replication #3658

Merged
merged 5 commits into from
Sep 18, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,17 @@
import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.bigtable.admin.v2.CheckConsistencyResponse;
import com.google.bigtable.admin.v2.DeleteTableRequest;
import com.google.bigtable.admin.v2.DropRowRangeRequest;
import com.google.bigtable.admin.v2.GenerateConsistencyTokenRequest;
import com.google.bigtable.admin.v2.GenerateConsistencyTokenResponse;
import com.google.bigtable.admin.v2.GetTableRequest;
import com.google.bigtable.admin.v2.InstanceName;
import com.google.bigtable.admin.v2.ListTablesRequest;
import com.google.bigtable.admin.v2.TableName;
import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPagedResponse;
import com.google.cloud.bigtable.admin.v2.models.ConsistencyToken;
import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
import com.google.cloud.bigtable.admin.v2.models.ModifyColumnFamiliesRequest;
import com.google.cloud.bigtable.admin.v2.models.Table;
import com.google.cloud.bigtable.admin.v2.stub.BigtableTableAdminStub;
import com.google.common.annotations.VisibleForTesting;
import com.google.cloud.bigtable.admin.v2.stub.EnhancedBigtableTableAdminStub;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -95,7 +90,7 @@
* }</pre>
*/
public final class BigtableTableAdminClient implements AutoCloseable {
private final BigtableTableAdminStub stub;
private final EnhancedBigtableTableAdminStub stub;
private final InstanceName instanceName;

/** Constructs an instance of BigtableTableAdminClient with the given instanceName. */
Expand All @@ -107,17 +102,19 @@ public static BigtableTableAdminClient create(@Nonnull InstanceName instanceName
/** Constructs an instance of BigtableTableAdminClient with the given settings. */
public static BigtableTableAdminClient create(@Nonnull BigtableTableAdminSettings settings)
throws IOException {
return create(settings.getInstanceName(), settings.getStubSettings().createStub());
EnhancedBigtableTableAdminStub stub = EnhancedBigtableTableAdminStub
.createEnhanced(settings.getStubSettings());
return create(settings.getInstanceName(), stub);
}

/** Constructs an instance of BigtableTableAdminClient with the given instanceName and stub. */
public static BigtableTableAdminClient create(@Nonnull InstanceName instanceName,
@Nonnull BigtableTableAdminStub stub) {
@Nonnull EnhancedBigtableTableAdminStub stub) {
return new BigtableTableAdminClient(instanceName, stub);
}

private BigtableTableAdminClient(@Nonnull InstanceName instanceName,
@Nonnull BigtableTableAdminStub stub) {
@Nonnull EnhancedBigtableTableAdminStub stub) {
Preconditions.checkNotNull(instanceName);
Preconditions.checkNotNull(stub);
this.instanceName = instanceName;
Expand Down Expand Up @@ -602,91 +599,57 @@ public ApiFuture<Void> dropAllRowsAsync(String tableId) {
}

/**
* Generates a token to verify the replication status of table mutations invoked before this call.
* Token expires in 90 days
* Blocks until replication has caught up to the point this method was called. This allows callers
* to make sure that their mutations have been replicated across all of their clusters.
*
* <p>Sample code:
* <p>Sample code
*
* <pre>{@code
* ConsistencyToken consistencyToken = client.generateConsistencyToken("my-table");
* client.awaitReplication("my-table");
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public ConsistencyToken generateConsistencyToken(String tableId) {
return awaitFuture(generateConsistencyTokenAsync(tableId));
}

/**
* Asynchornously generates a token to verify the replication status of table mutations invoked
* before this call. Token expires in 90 days
*
* <p>Sample code:
*
* <pre>{@code
* ApiFuture<ConsistencyToken> consistencyTokenFuture = client.generateConsistencyToken("my-table");
* }</pre>
* @throws com.google.api.gax.retrying.PollException when polling exceeds the total timeout
*/
// TODO(igorbernstein2): add sample code for waiting for the fetch consistency token
@SuppressWarnings("WeakerAccess")
public ApiFuture<ConsistencyToken> generateConsistencyTokenAsync(final String tableId) {
GenerateConsistencyTokenRequest request = GenerateConsistencyTokenRequest.newBuilder()
.setName(getTableName(tableId))
.build();

return ApiFutures.transform(
stub.generateConsistencyTokenCallable().futureCall(request),
new ApiFunction<GenerateConsistencyTokenResponse, ConsistencyToken>() {
@Override
public ConsistencyToken apply(GenerateConsistencyTokenResponse proto) {
TableName tableName = TableName
.of(instanceName.getProject(), instanceName.getInstance(), tableId);
return ConsistencyToken.of(tableName, proto.getConsistencyToken());
}
},
MoreExecutors.directExecutor());
public void awaitReplication(String tableId) {
TableName tableName = TableName
.of(instanceName.getProject(), instanceName.getInstance(), tableId);
awaitFuture(stub.awaitReplicationCallable().futureCall(tableName));
}

/**
* Checks replication consistency for the specified token consistency token
* Returns a future that is resolved when replication has caught up to the point this method was
* called. This allows callers to make sure that their mutations have been replicated across all
* of their clusters.
*
* <p>Sample code:
*
* <pre>{@code
* try(BigtableTableAdminClient client = BigtableTableAdminClient.create(InstanceName.of("[PROJECT]", "[INSTANCE]"))) {
* // Perform some mutations.
* ApiFuture<Void> replicationFuture = client.awaitReplicationAsync("my-table");
*
* ConsistencyToken token = client.generateConsistencyToken("table-id");
* while(!client.isConsistent(token)) {
* Thread.sleep(100);
* }
* ApiFutures.addCallback(
* replicationFuture,
* new ApiFutureCallback<Void>() {
* public void onSuccess(Table table) {
* System.out.println("All clusters are now consistent");
* }
*
* public void onFailure(Throwable t) {
* t.printStackTrace();
* }
* },
* MoreExecutors.directExecutor()
* );
*
* // Now all clusters are consistent
* }
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public boolean isConsistent(ConsistencyToken token) {
return awaitFuture(isConsistentAsync(token));
public ApiFuture<Void> awaitReplicationAsync(final String tableId) {
TableName tableName = TableName
.of(instanceName.getProject(), instanceName.getInstance(), tableId);
return stub.awaitReplicationCallable().futureCall(tableName);
}

@VisibleForTesting
ApiFuture<Boolean> isConsistentAsync(ConsistencyToken token) {
ApiFuture<CheckConsistencyResponse> checkConsResp = stub.checkConsistencyCallable()
.futureCall(token.toProto(instanceName));

return ApiFutures.transform(
checkConsResp,
new ApiFunction<CheckConsistencyResponse, Boolean>() {
@Override
public Boolean apply(CheckConsistencyResponse input) {
return input.getConsistent();
}
},
MoreExecutors.directExecutor());
}

// TODO(igorbernstein2): add awaitConsist() & awaitConsistAsync() that generate & poll a token

/**
* Helper method to construct the table name in format: projects/{project}/instances/{instance}/tables/{tableId}
*/
Expand Down

This file was deleted.

Loading