Skip to content

Commit 4158094

Browse files
authored
fix: update the accounting of partial batch mutations (#2149)
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/java-bigtable/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [x] Ensure the tests and linter pass - [x] Code coverage does not decrease (if any source code was changed) - [x] Appropriate docs were updated (if necessary) - [x] Rollback plan is reviewed and LGTMed - [x] All new data plane features have a completed end to end testing plan Fixes #1494 ☕️ If you write sample code, please follow the [samples format]( https://github.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
1 parent 6945e08 commit 4158094

17 files changed

+631
-140
lines changed

google-cloud-bigtable/clirr-ignored-differences.xml

+7
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,13 @@
140140
<method>*</method>
141141
<to>*</to>
142142
</difference>
143+
<!-- change method argument type is ok because MutateRowsBatchingDescriptor is InternalApi -->
144+
<difference>
145+
<differenceType>7005</differenceType>
146+
<className>com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsBatchingDescriptor</className>
147+
<method>*</method>
148+
<to>*</to>
149+
</difference>
143150
<!-- Removed methods in an internal class -->
144151
<difference>
145152
<differenceType>7002</differenceType>

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/MutateRowsException.java

+4-16
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,6 @@
3333
* were part of that RPC.
3434
*/
3535
public final class MutateRowsException extends ApiException {
36-
// Synthetic status to use for this ApiException subclass.
37-
private static final StatusCode LOCAL_STATUS =
38-
new StatusCode() {
39-
@Override
40-
public Code getCode() {
41-
return Code.INTERNAL;
42-
}
43-
44-
@Override
45-
public Object getTransportCode() {
46-
return null;
47-
}
48-
};
49-
5036
private final List<FailedMutation> failedMutations;
5137

5238
/**
@@ -56,22 +42,24 @@ public Object getTransportCode() {
5642
@InternalApi
5743
public static MutateRowsException create(
5844
@Nullable Throwable rpcError,
45+
StatusCode status,
5946
@Nonnull List<FailedMutation> failedMutations,
6047
boolean retryable) {
6148
ErrorDetails errorDetails = null;
6249
if (rpcError instanceof ApiException) {
6350
errorDetails = ((ApiException) rpcError).getErrorDetails();
6451
}
6552

66-
return new MutateRowsException(rpcError, failedMutations, retryable, errorDetails);
53+
return new MutateRowsException(rpcError, status, failedMutations, retryable, errorDetails);
6754
}
6855

6956
private MutateRowsException(
7057
@Nullable Throwable rpcError,
58+
StatusCode status,
7159
@Nonnull List<FailedMutation> failedMutations,
7260
boolean retryable,
7361
@Nullable ErrorDetails errorDetails) {
74-
super(rpcError, LOCAL_STATUS, retryable, errorDetails);
62+
super(rpcError, status, retryable, errorDetails);
7563
Preconditions.checkNotNull(failedMutations);
7664
Preconditions.checkArgument(!failedMutations.isEmpty(), "failedMutations can't be empty");
7765
this.failedMutations = failedMutations;

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableBatchingCallSettings.java

+14-7
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.google.api.gax.rpc.UnaryCallSettings;
2929
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
3030
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
31+
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptResult;
3132
import com.google.common.base.MoreObjects;
3233
import com.google.common.base.Preconditions;
3334
import java.util.Set;
@@ -57,11 +58,12 @@
5758
* @see RetrySettings for retry configuration.
5859
*/
5960
@BetaApi("This surface is likely to change as the batching surface evolves.")
60-
public final class BigtableBatchingCallSettings extends UnaryCallSettings<BulkMutation, Void> {
61+
public final class BigtableBatchingCallSettings
62+
extends UnaryCallSettings<BulkMutation, MutateRowsAttemptResult> {
6163

6264
// This settings is just a simple wrapper for BatchingCallSettings to allow us to add
6365
// additional functionality.
64-
private final BatchingCallSettings<RowMutationEntry, Void, BulkMutation, Void>
66+
private final BatchingCallSettings<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
6567
batchingCallSettings;
6668
private final boolean isLatencyBasedThrottlingEnabled;
6769
private final Long targetRpcLatencyMs;
@@ -89,7 +91,8 @@ public BatchingSettings getBatchingSettings() {
8991
}
9092

9193
/** Returns an adapter that packs and unpacks batching elements. */
92-
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> getBatchingDescriptor() {
94+
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
95+
getBatchingDescriptor() {
9396
return batchingCallSettings.getBatchingDescriptor();
9497
}
9598

@@ -120,7 +123,8 @@ public boolean isServerInitiatedFlowControlEnabled() {
120123
}
121124

122125
static Builder newBuilder(
123-
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> batchingDescriptor) {
126+
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
127+
batchingDescriptor) {
124128
return new Builder(batchingDescriptor);
125129
}
126130

@@ -148,9 +152,11 @@ public String toString() {
148152
* A base builder class for {@link BigtableBatchingCallSettings}. See the class documentation of
149153
* {@link BigtableBatchingCallSettings} for a description of the different values that can be set.
150154
*/
151-
public static class Builder extends UnaryCallSettings.Builder<BulkMutation, Void> {
155+
public static class Builder
156+
extends UnaryCallSettings.Builder<BulkMutation, MutateRowsAttemptResult> {
152157

153-
private BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> batchingDescriptor;
158+
private BatchingDescriptor<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
159+
batchingDescriptor;
154160
private BatchingSettings batchingSettings;
155161
private boolean isLatencyBasedThrottlingEnabled;
156162
private Long targetRpcLatencyMs;
@@ -160,7 +166,8 @@ public static class Builder extends UnaryCallSettings.Builder<BulkMutation, Void
160166

161167
private Builder(
162168
@Nonnull
163-
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> batchingDescriptor) {
169+
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
170+
batchingDescriptor) {
164171
this.batchingDescriptor =
165172
Preconditions.checkNotNull(batchingDescriptor, "batching descriptor can't be null");
166173
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java

+46-43
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,9 @@
102102
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersUnaryCallable;
103103
import com.google.cloud.bigtable.data.v2.stub.metrics.TracedBatcherUnaryCallable;
104104
import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable;
105+
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptResult;
105106
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
107+
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsPartialErrorRetryAlgorithm;
106108
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
107109
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
108110
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
@@ -165,7 +167,8 @@ public class EnhancedBigtableStub implements AutoCloseable {
165167
private final UnaryCallable<Query, List<Row>> bulkReadRowsCallable;
166168
private final UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable;
167169
private final UnaryCallable<RowMutation, Void> mutateRowCallable;
168-
private final UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable;
170+
private final UnaryCallable<BulkMutation, MutateRowsAttemptResult> bulkMutateRowsCallable;
171+
private final UnaryCallable<BulkMutation, Void> externalBulkMutateRowsCallable;
169172
private final UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable;
170173
private final UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable;
171174
private final UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable;
@@ -368,7 +371,9 @@ public EnhancedBigtableStub(
368371
bulkReadRowsCallable = createBulkReadRowsCallable(new DefaultRowAdapter());
369372
sampleRowKeysCallable = createSampleRowKeysCallable();
370373
mutateRowCallable = createMutateRowCallable();
371-
bulkMutateRowsCallable = createBulkMutateRowsCallable();
374+
bulkMutateRowsCallable = createMutateRowsBaseCallable();
375+
externalBulkMutateRowsCallable =
376+
new MutateRowsErrorConverterUnaryCallable(bulkMutateRowsCallable);
372377
checkAndMutateRowCallable = createCheckAndMutateRowCallable();
373378
readModifyWriteRowCallable = createReadModifyWriteRowCallable();
374379
generateInitialChangeStreamPartitionsCallable =
@@ -665,14 +670,24 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
665670
}
666671

667672
/**
668-
* Internal helper to create the base MutateRows callable chain. The chain is responsible for
669-
* retrying individual entry in case of error.
673+
* Creates a callable chain to handle MutatesRows RPCs. This is meant to be used for manual
674+
* batching. The chain will:
670675
*
671-
* <p>NOTE: the caller is responsible for adding tracing & metrics.
676+
* <ul>
677+
* <li>Convert a {@link BulkMutation} into a {@link MutateRowsRequest}.
678+
* <li>Process the response and schedule retries. At the end of each attempt, entries that have
679+
* been applied, are filtered from the next attempt. Also, any entries that failed with a
680+
* nontransient error, are filtered from the next attempt. This will continue until there
681+
* are no more entries or there are no more retry attempts left.
682+
* <li>Wrap batch failures in a {@link MutateRowsAttemptResult}.
683+
* <li>Add tracing & metrics.
684+
* </ul>
685+
*
686+
* This callable returns an internal type {@link MutateRowsAttemptResult}.
672687
*
673-
* @see MutateRowsRetryingCallable for more details
688+
* <p>This function should not be exposed to external users, as it could cause a data loss.
674689
*/
675-
private UnaryCallable<MutateRowsRequest, Void> createMutateRowsBaseCallable() {
690+
private UnaryCallable<BulkMutation, MutateRowsAttemptResult> createMutateRowsBaseCallable() {
676691
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> base =
677692
GrpcRawCallableFactory.createServerStreamingCallable(
678693
GrpcCallSettings.<MutateRowsRequest, MutateRowsResponse>newBuilder()
@@ -706,55 +721,38 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
706721
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> withBigtableTracer =
707722
new BigtableTracerStreamingCallable<>(convertException);
708723

709-
BasicResultRetryAlgorithm<Void> resultRetryAlgorithm;
724+
BasicResultRetryAlgorithm<MutateRowsAttemptResult> resultRetryAlgorithm;
710725
if (settings.getEnableRetryInfo()) {
711726
resultRetryAlgorithm = new RetryInfoRetryAlgorithm<>();
712727
} else {
713728
resultRetryAlgorithm = new ApiResultRetryAlgorithm<>();
714729
}
730+
MutateRowsPartialErrorRetryAlgorithm mutateRowsPartialErrorRetryAlgorithm =
731+
new MutateRowsPartialErrorRetryAlgorithm(resultRetryAlgorithm);
715732

716-
RetryAlgorithm<Void> retryAlgorithm =
733+
RetryAlgorithm<MutateRowsAttemptResult> retryAlgorithm =
717734
new RetryAlgorithm<>(
718-
resultRetryAlgorithm,
735+
mutateRowsPartialErrorRetryAlgorithm,
719736
new ExponentialRetryAlgorithm(
720737
settings.bulkMutateRowsSettings().getRetrySettings(), clientContext.getClock()));
721738

722-
RetryingExecutorWithContext<Void> retryingExecutor =
739+
RetryingExecutorWithContext<MutateRowsAttemptResult> retryingExecutor =
723740
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());
741+
UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> baseCallable =
742+
new MutateRowsRetryingCallable(
743+
clientContext.getDefaultCallContext(),
744+
withBigtableTracer,
745+
retryingExecutor,
746+
settings.bulkMutateRowsSettings().getRetryableCodes(),
747+
retryAlgorithm);
724748

725-
return new MutateRowsRetryingCallable(
726-
clientContext.getDefaultCallContext(),
727-
withBigtableTracer,
728-
retryingExecutor,
729-
settings.bulkMutateRowsSettings().getRetryableCodes(),
730-
retryAlgorithm);
731-
}
732-
733-
/**
734-
* Creates a callable chain to handle MutatesRows RPCs. This is meant to be used for manual
735-
* batching. The chain will:
736-
*
737-
* <ul>
738-
* <li>Convert a {@link BulkMutation} into a {@link MutateRowsRequest}.
739-
* <li>Process the response and schedule retries. At the end of each attempt, entries that have
740-
* been applied, are filtered from the next attempt. Also, any entries that failed with a
741-
* nontransient error, are filtered from the next attempt. This will continue until there
742-
* are no more entries or there are no more retry attempts left.
743-
* <li>Wrap batch failures in a {@link
744-
* com.google.cloud.bigtable.data.v2.models.MutateRowsException}.
745-
* <li>Add tracing & metrics.
746-
* </ul>
747-
*/
748-
private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
749-
UnaryCallable<MutateRowsRequest, Void> baseCallable = createMutateRowsBaseCallable();
750-
751-
UnaryCallable<MutateRowsRequest, Void> withCookie = baseCallable;
749+
UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> withCookie = baseCallable;
752750

753751
if (settings.getEnableRoutingCookie()) {
754752
withCookie = new CookiesUnaryCallable<>(baseCallable);
755753
}
756754

757-
UnaryCallable<MutateRowsRequest, Void> flowControlCallable = null;
755+
UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> flowControlCallable = null;
758756
if (settings.bulkMutateRowsSettings().isLatencyBasedThrottlingEnabled()) {
759757
flowControlCallable =
760758
new DynamicFlowControlCallable(
@@ -764,16 +762,16 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
764762
settings.bulkMutateRowsSettings().getTargetRpcLatencyMs(),
765763
FLOW_CONTROL_ADJUSTING_INTERVAL_MS);
766764
}
767-
UnaryCallable<BulkMutation, Void> userFacing =
765+
UnaryCallable<BulkMutation, MutateRowsAttemptResult> userFacing =
768766
new BulkMutateRowsUserFacingCallable(
769767
flowControlCallable != null ? flowControlCallable : withCookie, requestContext);
770768

771769
SpanName spanName = getSpanName("MutateRows");
772770

773-
UnaryCallable<BulkMutation, Void> tracedBatcherUnaryCallable =
771+
UnaryCallable<BulkMutation, MutateRowsAttemptResult> tracedBatcherUnaryCallable =
774772
new TracedBatcherUnaryCallable<>(userFacing);
775773

776-
UnaryCallable<BulkMutation, Void> traced =
774+
UnaryCallable<BulkMutation, MutateRowsAttemptResult> traced =
777775
new TracedUnaryCallable<>(
778776
tracedBatcherUnaryCallable, clientContext.getTracerFactory(), spanName);
779777

@@ -1171,10 +1169,15 @@ public UnaryCallable<RowMutation, Void> mutateRowCallable() {
11711169
}
11721170

11731171
/**
1174-
* Returns the callable chain created in {@link #createBulkMutateRowsCallable()} ()} during stub
1172+
* Returns the callable chain created in {@link #createMutateRowsBaseCallable()} during stub
11751173
* construction.
11761174
*/
11771175
public UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable() {
1176+
return externalBulkMutateRowsCallable;
1177+
}
1178+
1179+
@InternalApi
1180+
public UnaryCallable<BulkMutation, MutateRowsAttemptResult> internalBulkMutateRowsCallable() {
11781181
return bulkMutateRowsCallable;
11791182
}
11801183

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub;
17+
18+
import com.google.api.core.ApiFuture;
19+
import com.google.api.core.ApiFutures;
20+
import com.google.api.core.InternalApi;
21+
import com.google.api.gax.grpc.GrpcStatusCode;
22+
import com.google.api.gax.rpc.ApiCallContext;
23+
import com.google.api.gax.rpc.UnaryCallable;
24+
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
25+
import com.google.cloud.bigtable.data.v2.models.MutateRowsException;
26+
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptResult;
27+
import com.google.common.util.concurrent.MoreExecutors;
28+
import io.grpc.Status;
29+
30+
/**
31+
* This callable converts partial batch failures into an exception. This is necessary to make sure
32+
* that the caller properly handles issues and avoids possible data loss on partial failures
33+
*/
34+
@InternalApi
35+
public class MutateRowsErrorConverterUnaryCallable extends UnaryCallable<BulkMutation, Void> {
36+
37+
private final UnaryCallable<BulkMutation, MutateRowsAttemptResult> innerCallable;
38+
39+
public MutateRowsErrorConverterUnaryCallable(
40+
UnaryCallable<BulkMutation, MutateRowsAttemptResult> callable) {
41+
this.innerCallable = callable;
42+
}
43+
44+
@Override
45+
public ApiFuture<Void> futureCall(BulkMutation request, ApiCallContext context) {
46+
ApiFuture<MutateRowsAttemptResult> future = innerCallable.futureCall(request, context);
47+
return ApiFutures.transform(
48+
future,
49+
result -> {
50+
if (!result.getFailedMutations().isEmpty()) {
51+
throw MutateRowsException.create(
52+
null,
53+
GrpcStatusCode.of(Status.Code.OK),
54+
result.getFailedMutations(),
55+
result.getIsRetryable());
56+
}
57+
return null;
58+
},
59+
MoreExecutors.directExecutor());
60+
}
61+
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/BulkMutateRowsUserFacingCallable.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,22 @@
3030
* applications.
3131
*/
3232
@InternalApi
33-
public final class BulkMutateRowsUserFacingCallable extends UnaryCallable<BulkMutation, Void> {
34-
private final UnaryCallable<MutateRowsRequest, Void> innerCallable;
33+
public final class BulkMutateRowsUserFacingCallable
34+
extends UnaryCallable<BulkMutation, MutateRowsAttemptResult> {
35+
36+
private final UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> innerCallable;
3537
private final RequestContext requestContext;
3638

3739
public BulkMutateRowsUserFacingCallable(
38-
UnaryCallable<MutateRowsRequest, Void> innerCallable, RequestContext requestContext) {
40+
UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> innerCallable,
41+
RequestContext requestContext) {
3942
this.innerCallable = innerCallable;
4043
this.requestContext = requestContext;
4144
}
4245

4346
@Override
44-
public ApiFuture<Void> futureCall(BulkMutation request, ApiCallContext context) {
47+
public ApiFuture<MutateRowsAttemptResult> futureCall(
48+
BulkMutation request, ApiCallContext context) {
4549
return innerCallable.futureCall(request.toProto(requestContext), context);
4650
}
4751
}

0 commit comments

Comments
 (0)