Skip to content

Commit

Permalink
feat: improve handling when crossing async and sync boundaries (#2976)
Browse files Browse the repository at this point in the history
* feat: improve handling when crossing async and sync boundaries

Previously we would blindly uses ApiExceptions.callAndTranslateApiException and Futures.getUnchecked to await future results. This created a number of problems:
* It would bubble ApiExceptions & CheckExceptions to the surface, which is quite unexpected for an hbase client
* It would hide hbase specific IOExceptions

This PR introduces FutureUtil#unwrap, which will try to preserve the original exception. However to aid in debugging, it will augment the stacktrace to include the caller. This is a similar approach as hbase took in:
https://github.com/apache/hbase/blob/5b9940907e695e20d24968cbc725277a19ce2170/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java#L129-L140

Unfortunately we can't just call that utility because it only exists in hbase 2x, so I inlined relevant bits

* typo

* fix test case

* fix deps
  • Loading branch information
igorbernstein2 authored May 24, 2021
1 parent 68c2773 commit 0a63256
Show file tree
Hide file tree
Showing 8 changed files with 297 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.google.cloud.bigtable.hbase;

import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiExceptions;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.Filters;
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
Expand All @@ -26,6 +25,7 @@
import com.google.cloud.bigtable.hbase.adapters.HBaseRequestAdapter;
import com.google.cloud.bigtable.hbase.adapters.read.GetAdapter;
import com.google.cloud.bigtable.hbase.util.ByteStringer;
import com.google.cloud.bigtable.hbase.util.FutureUtil;
import com.google.cloud.bigtable.hbase.util.Logger;
import com.google.cloud.bigtable.hbase.wrappers.BigtableApi;
import com.google.cloud.bigtable.hbase.wrappers.BigtableHBaseSettings;
Expand Down Expand Up @@ -162,10 +162,14 @@ public boolean exists(Get get) throws IOException {
Filters.Filter filter =
Adapters.GET_ADAPTER.buildFilter(GetAdapter.setCheckExistenceOnly(get));

return !ApiExceptions.callAndTranslateApiException(
clientWrapper.readRowAsync(
tableName.getNameAsString(), ByteStringer.wrap(get.getRow()), filter))
.isEmpty();
try {
return !FutureUtil.unwrap(
clientWrapper.readRowAsync(
tableName.getNameAsString(), ByteStringer.wrap(get.getRow()), filter))
.isEmpty();
} catch (Exception e) {
throw createRetriesExhaustedWithDetailsException(e, get);
}
}
}

Expand Down Expand Up @@ -266,9 +270,13 @@ public Result get(Get get) throws IOException {
Timer.Context ignored = metrics.getTimer.time()) {

Filters.Filter filter = Adapters.GET_ADAPTER.buildFilter(get);
return ApiExceptions.callAndTranslateApiException(
clientWrapper.readRowAsync(
tableName.getNameAsString(), ByteStringer.wrap(get.getRow()), filter));
try {
return FutureUtil.unwrap(
clientWrapper.readRowAsync(
tableName.getNameAsString(), ByteStringer.wrap(get.getRow()), filter));
} catch (Exception e) {
throw createRetriesExhaustedWithDetailsException(e, get);
}
}
}

Expand Down Expand Up @@ -454,8 +462,7 @@ private boolean checkAndMutate(final byte[] row, ConditionalRowMutation request,
throws IOException {
Span span = TRACER.spanBuilder("BigtableTable." + type).startSpan();
try (Scope scope = TRACER.withSpan(span)) {
Boolean wasApplied =
ApiExceptions.callAndTranslateApiException(clientWrapper.checkAndMutateRowAsync(request));
Boolean wasApplied = FutureUtil.unwrap(clientWrapper.checkAndMutateRowAsync(request));
return CheckAndMutateUtil.wasMutationApplied(request, wasApplied);
} catch (Throwable t) {
span.setStatus(Status.UNKNOWN);
Expand All @@ -469,7 +476,7 @@ private void mutateRow(Mutation mutation, RowMutation rowMutation, String type)
throws IOException {
Span span = TRACER.spanBuilder("BigtableTable." + type).startSpan();
try (Scope scope = TRACER.withSpan(span)) {
ApiExceptions.callAndTranslateApiException(clientWrapper.mutateRowAsync(rowMutation));
FutureUtil.unwrap(clientWrapper.mutateRowAsync(rowMutation));
} catch (Throwable t) {
span.setStatus(Status.UNKNOWN);
throw logAndCreateIOException(type, mutation.getRow(), t);
Expand All @@ -487,8 +494,7 @@ public void mutateRow(RowMutations rowMutations) throws IOException {
}
Span span = TRACER.spanBuilder("BigtableTable.mutateRow").startSpan();
try (Scope scope = TRACER.withSpan(span)) {
ApiExceptions.callAndTranslateApiException(
clientWrapper.mutateRowAsync(hbaseAdapter.adapt(rowMutations)));
FutureUtil.unwrap(clientWrapper.mutateRowAsync(hbaseAdapter.adapt(rowMutations)));
} catch (Throwable t) {
span.setStatus(Status.UNKNOWN);
throw logAndCreateIOException("mutateRow", rowMutations.getRow(), t);
Expand All @@ -504,8 +510,7 @@ public Result append(Append append) throws IOException {
Span span = TRACER.spanBuilder("BigtableTable.append").startSpan();
try (Scope scope = TRACER.withSpan(span)) {
Result response =
ApiExceptions.callAndTranslateApiException(
clientWrapper.readModifyWriteRowAsync(hbaseAdapter.adapt(append)));
FutureUtil.unwrap(clientWrapper.readModifyWriteRowAsync(hbaseAdapter.adapt(append)));
// The bigtable API will always return the mutated results. In order to maintain
// compatibility, simply return null when results were not requested.
if (append.isReturnResults()) {
Expand All @@ -528,8 +533,7 @@ public Result increment(Increment increment) throws IOException {
Span span = TRACER.spanBuilder("BigtableTable.increment").startSpan();
try (Scope scope = TRACER.withSpan(span)) {
ReadModifyWriteRow request = hbaseAdapter.adapt(increment);
return ApiExceptions.callAndTranslateApiException(
clientWrapper.readModifyWriteRowAsync(request));
return FutureUtil.unwrap(clientWrapper.readModifyWriteRowAsync(request));
} catch (Throwable t) {
span.setStatus(Status.UNKNOWN);
throw logAndCreateIOException("increment", increment.getRow(), t);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2021 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.hbase.util;

import com.google.api.core.InternalApi;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/** Helpers to handle async operations inside the hbase adapter. */
@InternalApi
public class FutureUtil {

/**
* Extract the underlying causes for the future's failure.
*
* <p>The future's and the callers stacktraces will be merged to ease debugging.
*/
// This functionality was extracted from HBase 2.x
public static <T> T unwrap(Future<T> future) throws IOException {
try {
return future.get();
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException().initCause(e);
} catch (ExecutionException e) {
throw rethrow(e.getCause());
}
}

private static IOException rethrow(Throwable error) throws IOException {
if (error instanceof IOException) {
setStackTrace(error);
throw (IOException) error;
} else if (error instanceof RuntimeException) {
setStackTrace(error);
throw (RuntimeException) error;
} else if (error instanceof Error) {
setStackTrace(error);
throw (Error) error;
} else {
throw new IOException(error);
}
}

private static void setStackTrace(Throwable error) {
StackTraceElement[] localStackTrace = Thread.currentThread().getStackTrace();
StackTraceElement[] originalStackTrace = error.getStackTrace();
StackTraceElement[] newStackTrace =
new StackTraceElement[localStackTrace.length + originalStackTrace.length + 1];
System.arraycopy(localStackTrace, 0, newStackTrace, 0, localStackTrace.length);
newStackTrace[localStackTrace.length] =
new StackTraceElement("--------Future", "get--------", null, -1);
System.arraycopy(
originalStackTrace,
0,
newStackTrace,
localStackTrace.length + 1,
originalStackTrace.length);
error.setStackTrace(newStackTrace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiExceptions;
import com.google.api.gax.rpc.FailedPreconditionException;
import com.google.cloud.bigtable.admin.v2.internal.NameUtil;
import com.google.cloud.bigtable.admin.v2.models.Backup;
Expand All @@ -35,14 +34,14 @@
import com.google.cloud.bigtable.grpc.BigtableInstanceName;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import com.google.cloud.bigtable.hbase.adapters.admin.TableAdapter;
import com.google.cloud.bigtable.hbase.util.FutureUtil;
import com.google.cloud.bigtable.hbase.util.Logger;
import com.google.cloud.bigtable.hbase.util.ModifyTableBuilder;
import com.google.cloud.bigtable.hbase.wrappers.AdminClientWrapper;
import com.google.cloud.bigtable.hbase.wrappers.BigtableHBaseSettings;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
Expand Down Expand Up @@ -271,8 +270,7 @@ public HTableDescriptor[] listTables(String regex, boolean includeSysTables) thr
@Override
public TableName[] listTableNames() throws IOException {
// tablesList contains list of tableId.
List<String> tablesList =
ApiExceptions.callAndTranslateApiException(adminClientWrapper.listTablesAsync());
List<String> tablesList = FutureUtil.unwrap(adminClientWrapper.listTablesAsync());

TableName[] result = new TableName[tablesList.size()];
for (int i = 0; i < tablesList.size(); i++) {
Expand All @@ -290,8 +288,7 @@ public HTableDescriptor getTableDescriptor(TableName tableName) throws IOExcepti

try {
return TableAdapter.adapt(
ApiExceptions.callAndTranslateApiException(
adminClientWrapper.getTableAsync(tableName.getNameAsString())));
FutureUtil.unwrap(adminClientWrapper.getTableAsync(tableName.getNameAsString())));
} catch (Throwable throwable) {
if (Status.fromThrowable(throwable).getCode() == Status.Code.NOT_FOUND) {
throw new TableNotFoundException(tableName);
Expand Down Expand Up @@ -365,7 +362,7 @@ public void createTable(HTableDescriptor desc, byte[][] splitKeys) throws IOExce
*/
protected void createTable(TableName tableName, CreateTableRequest request) throws IOException {
try {
ApiExceptions.callAndTranslateApiException(adminClientWrapper.createTableAsync(request));
FutureUtil.unwrap(adminClientWrapper.createTableAsync(request));
} catch (Throwable throwable) {
throw convertToTableExistsException(tableName, throwable);
}
Expand Down Expand Up @@ -418,8 +415,7 @@ public static IOException convertToTableExistsException(
@Override
public void deleteTable(TableName tableName) throws IOException {
try {
ApiExceptions.callAndTranslateApiException(
adminClientWrapper.deleteTableAsync(tableName.getNameAsString()));
FutureUtil.unwrap(adminClientWrapper.deleteTableAsync(tableName.getNameAsString()));
} catch (Throwable throwable) {
throw new IOException(
String.format("Failed to delete table '%s'", tableName.getNameAsString()), throwable);
Expand Down Expand Up @@ -620,7 +616,7 @@ public void modifyTable(TableName tableName, HTableDescriptor newDescriptor) thr
try {
ModifyColumnFamiliesRequest request =
buildModifications(newDescriptor, getTableDescriptor(tableName)).build();
ApiExceptions.callAndTranslateApiException(adminClientWrapper.modifyFamiliesAsync(request));
FutureUtil.unwrap(adminClientWrapper.modifyFamiliesAsync(request));
} catch (Throwable throwable) {
throw new IOException(
String.format("Failed to modify table '%s'", tableName.getNameAsString()), throwable);
Expand All @@ -643,8 +639,7 @@ protected Void modifyColumns(
TableName tableName, String columnName, String modificationType, ModifyTableBuilder builder)
throws IOException {
try {
ApiExceptions.callAndTranslateApiException(
adminClientWrapper.modifyFamiliesAsync(builder.build()));
FutureUtil.unwrap(adminClientWrapper.modifyFamiliesAsync(builder.build()));
return null;
} catch (Throwable throwable) {
throw new IOException(
Expand Down Expand Up @@ -785,8 +780,7 @@ public void truncateTable(TableName tableName, boolean preserveSplits) throws IO
LOG.info("truncate will preserveSplits. The passed in variable is ignored.");
}
try {
ApiExceptions.callAndTranslateApiException(
adminClientWrapper.dropAllRowsAsync(tableName.getNameAsString()));
FutureUtil.unwrap(adminClientWrapper.dropAllRowsAsync(tableName.getNameAsString()));
} catch (Throwable throwable) {
throw new IOException(
String.format("Failed to truncate table '%s'", tableName.getNameAsString()), throwable);
Expand All @@ -803,7 +797,7 @@ public void truncateTable(TableName tableName, boolean preserveSplits) throws IO
*/
public void deleteRowRangeByPrefix(TableName tableName, byte[] prefix) throws IOException {
try {
ApiExceptions.callAndTranslateApiException(
FutureUtil.unwrap(
adminClientWrapper.dropRowRangeAsync(
tableName.getNameAsString(), ByteString.copyFrom(prefix)));
} catch (Throwable throwable) {
Expand Down Expand Up @@ -908,7 +902,7 @@ public void cloneSnapshot(String snapshotId, TableName tableName)
RestoreTableRequest request =
RestoreTableRequest.of(getBackupClusterId(), snapshotId)
.setTableId(tableName.getNameAsString());
Futures.getChecked(adminClientWrapper.restoreTableAsync(request), IOException.class);
FutureUtil.unwrap(adminClientWrapper.restoreTableAsync(request));
}

/** {@inheritDoc} */
Expand All @@ -925,9 +919,8 @@ public void deleteSnapshot(String snapshotId) throws IOException {
return;
}

Futures.getChecked(
adminClientWrapper.deleteBackupAsync(getBackupClusterName().getClusterId(), snapshotId),
IOException.class);
FutureUtil.unwrap(
adminClientWrapper.deleteBackupAsync(getBackupClusterName().getClusterId(), snapshotId));
}

protected Backup snapshotTable(String snapshotId, TableName tableName) throws IOException {
Expand All @@ -947,7 +940,7 @@ protected Backup snapshotTable(String snapshotId, TableName tableName) throws IO
Instant expireTime = Instant.now().plus(ttlSecondsForBackup, ChronoUnit.SECONDS);
request.setExpireTime(expireTime);

return Futures.getChecked(adminClientWrapper.createBackupAsync(request), IOException.class);
return FutureUtil.unwrap(adminClientWrapper.createBackupAsync(request));
}

@Override
Expand Down
Loading

0 comments on commit 0a63256

Please sign in to comment.