Skip to content

Commit b545557

Browse files
authored
feat: add option for retrying DML as PDML (#3480)
* feat: add option for retrying DML as PDML Adds an option to the Connection API for automatically retrying DML statements as Partitioned DML, if the DML statement fails due to exceeding the Spanner mutation limit. The retry as Partitiond DML fails if the DML statement is not suitable for Partitioned DML. The option can be enabled with the `fallback_to_partitioned_dml` connection variable. This can be set with a SQL statement like this: ``` SET FALLBACK_TO_PARTITIONED_DML = TRUE; UPDATE my_table SET active=true WHERE true; ``` The property can also be set in the connection URL and by calling the method `Connection#setFallbackToPartitionedDml(boolean)`. This option can also be used in the Spanner JDBC driver and PGAdapter, once those libraries include a version of the Spanner client that includes this change. * refactor: include the option in autocommit_dml_mode
1 parent ed0ad28 commit b545557

18 files changed

+1580
-248
lines changed

google-cloud-spanner/clirr-ignored-differences.xml

+18
Original file line numberDiff line numberDiff line change
@@ -791,4 +791,22 @@
791791
<method>boolean isAutoBatchDmlUpdateCountVerification()</method>
792792
</difference>
793793

794+
<!-- Retry DML as Partitioned DML -->
795+
<difference>
796+
<differenceType>7012</differenceType>
797+
<className>com/google/cloud/spanner/connection/TransactionRetryListener</className>
798+
<method>void retryDmlAsPartitionedDmlStarting(java.util.UUID, com.google.cloud.spanner.Statement, com.google.cloud.spanner.TransactionMutationLimitExceededException)</method>
799+
</difference>
800+
<difference>
801+
<differenceType>7012</differenceType>
802+
<className>com/google/cloud/spanner/connection/TransactionRetryListener</className>
803+
<method>void retryDmlAsPartitionedDmlFinished(java.util.UUID, com.google.cloud.spanner.Statement, long)</method>
804+
</difference>
805+
<difference>
806+
<differenceType>7012</differenceType>
807+
<className>com/google/cloud/spanner/connection/TransactionRetryListener</className>
808+
<method>void retryDmlAsPartitionedDmlFailed(java.util.UUID, com.google.cloud.spanner.Statement, java.lang.Throwable)</method>
809+
</difference>
810+
811+
794812
</differences>

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java

+20
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@
1616

1717
package com.google.cloud.spanner;
1818

19+
import static com.google.cloud.spanner.TransactionMutationLimitExceededException.isTransactionMutationLimitException;
20+
1921
import com.google.api.gax.grpc.GrpcStatusCode;
2022
import com.google.api.gax.rpc.ApiException;
23+
import com.google.api.gax.rpc.ErrorDetails;
2124
import com.google.api.gax.rpc.WatchdogTimeoutException;
2225
import com.google.cloud.spanner.SpannerException.DoNotConstructDirectly;
2326
import com.google.common.base.MoreObjects;
@@ -256,6 +259,18 @@ private static ErrorInfo extractErrorInfo(Throwable cause) {
256259
return null;
257260
}
258261

262+
static ErrorDetails extractErrorDetails(Throwable cause) {
263+
Throwable prevCause = null;
264+
while (cause != null && cause != prevCause) {
265+
if (cause instanceof ApiException) {
266+
return ((ApiException) cause).getErrorDetails();
267+
}
268+
prevCause = cause;
269+
cause = cause.getCause();
270+
}
271+
return null;
272+
}
273+
259274
/**
260275
* Creates a {@link StatusRuntimeException} that contains a {@link RetryInfo} with the specified
261276
* retry delay.
@@ -313,6 +328,11 @@ static SpannerException newSpannerExceptionPreformatted(
313328
token, message, resourceInfo, cause, apiException);
314329
}
315330
}
331+
case INVALID_ARGUMENT:
332+
if (isTransactionMutationLimitException(cause)) {
333+
return new TransactionMutationLimitExceededException(
334+
token, code, message, cause, apiException);
335+
}
316336
// Fall through to the default.
317337
default:
318338
return new SpannerException(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import static com.google.cloud.spanner.SpannerExceptionFactory.extractErrorDetails;
20+
21+
import com.google.api.gax.rpc.ApiException;
22+
import com.google.api.gax.rpc.ErrorDetails;
23+
import javax.annotation.Nullable;
24+
25+
/** Exception thrown by Spanner when the transaction mutation limit has been exceeded. */
26+
public class TransactionMutationLimitExceededException extends SpannerException {
27+
private static final long serialVersionUID = 1L;
28+
29+
/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
30+
TransactionMutationLimitExceededException(
31+
DoNotConstructDirectly token,
32+
ErrorCode errorCode,
33+
String message,
34+
Throwable cause,
35+
@Nullable ApiException apiException) {
36+
super(token, errorCode, /*retryable = */ false, message, cause, apiException);
37+
}
38+
39+
static boolean isTransactionMutationLimitException(Throwable cause) {
40+
if (cause == null
41+
|| cause.getMessage() == null
42+
|| !cause.getMessage().contains("The transaction contains too many mutations.")) {
43+
return false;
44+
}
45+
// Spanner includes a hint that points to the Spanner limits documentation page when the error
46+
// was that the transaction mutation limit was exceeded. We use that here to identify the error,
47+
// as there is no other specific metadata in the error that identifies it (other than the error
48+
// message).
49+
ErrorDetails errorDetails = extractErrorDetails(cause);
50+
if (errorDetails != null && errorDetails.getHelp() != null) {
51+
return errorDetails.getHelp().getLinksCount() == 1
52+
&& errorDetails
53+
.getHelp()
54+
.getLinks(0)
55+
.getDescription()
56+
.equals("Cloud Spanner limits documentation.")
57+
&& errorDetails
58+
.getHelp()
59+
.getLinks(0)
60+
.getUrl()
61+
.equals("https://cloud.google.com/spanner/docs/limits");
62+
}
63+
return false;
64+
}
65+
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/AbstractBaseUnitOfWork.java

+14
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import com.google.cloud.spanner.Struct;
4040
import com.google.cloud.spanner.Type.StructField;
4141
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement;
42+
import com.google.cloud.spanner.connection.ReadWriteTransaction.Builder;
4243
import com.google.cloud.spanner.connection.StatementExecutor.StatementTimeout;
4344
import com.google.common.base.Preconditions;
4445
import com.google.common.collect.ImmutableList;
@@ -75,6 +76,7 @@ abstract class AbstractBaseUnitOfWork implements UnitOfWork {
7576
private final StatementExecutor statementExecutor;
7677
private final StatementTimeout statementTimeout;
7778
protected final String transactionTag;
79+
protected final List<TransactionRetryListener> transactionRetryListeners;
7880
protected final boolean excludeTxnFromChangeStreams;
7981
protected final RpcPriority rpcPriority;
8082
protected final Span span;
@@ -110,6 +112,7 @@ abstract static class Builder<B extends Builder<?, T>, T extends AbstractBaseUni
110112
private StatementExecutor statementExecutor;
111113
private StatementTimeout statementTimeout = new StatementTimeout();
112114
private String transactionTag;
115+
private List<TransactionRetryListener> transactionRetryListeners;
113116

114117
private boolean excludeTxnFromChangeStreams;
115118
private RpcPriority rpcPriority;
@@ -134,6 +137,16 @@ B setStatementTimeout(StatementTimeout timeout) {
134137
return self();
135138
}
136139

140+
B setTransactionRetryListeners(List<TransactionRetryListener> listeners) {
141+
Preconditions.checkNotNull(listeners);
142+
this.transactionRetryListeners = listeners;
143+
return self();
144+
}
145+
146+
boolean hasTransactionRetryListeners() {
147+
return this.transactionRetryListeners != null;
148+
}
149+
137150
B setTransactionTag(@Nullable String tag) {
138151
this.transactionTag = tag;
139152
return self();
@@ -162,6 +175,7 @@ B setSpan(@Nullable Span span) {
162175
this.statementExecutor = builder.statementExecutor;
163176
this.statementTimeout = builder.statementTimeout;
164177
this.transactionTag = builder.transactionTag;
178+
this.transactionRetryListeners = builder.transactionRetryListeners;
165179
this.excludeTxnFromChangeStreams = builder.excludeTxnFromChangeStreams;
166180
this.rpcPriority = builder.rpcPriority;
167181
this.span = Preconditions.checkNotNull(builder.span);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/AutocommitDmlMode.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,18 @@
1818

1919
/** Enum used to define the behavior of DML statements in autocommit mode */
2020
public enum AutocommitDmlMode {
21+
/** TRANSACTIONAL: DML statements use a standard atomic transaction. */
2122
TRANSACTIONAL,
22-
PARTITIONED_NON_ATOMIC;
23+
/** PARTITIONED_NON_ATOMIC: DML statements use a Partitioned DML transaction. */
24+
PARTITIONED_NON_ATOMIC,
25+
/**
26+
* TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC: DML statements are first executed with a
27+
* standard atomic transaction. If that fails due to the mutation limit being exceeded, the
28+
* statement will automatically be retried using a Partitioned DML transaction. These statements
29+
* are not guaranteed to be atomic. The corresponding {@link TransactionRetryListener} methods
30+
* will be invoked when a DML statement falls back to Partitioned DML.
31+
*/
32+
TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC;
2333

2434
private final String statementString;
2535

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java

+1
Original file line numberDiff line numberDiff line change
@@ -2125,6 +2125,7 @@ UnitOfWork createNewUnitOfWork(
21252125
.setReadOnly(getConnectionPropertyValue(READONLY))
21262126
.setReadOnlyStaleness(getConnectionPropertyValue(READ_ONLY_STALENESS))
21272127
.setAutocommitDmlMode(getConnectionPropertyValue(AUTOCOMMIT_DML_MODE))
2128+
.setTransactionRetryListeners(transactionRetryListeners)
21282129
.setReturnCommitStats(getConnectionPropertyValue(RETURN_COMMIT_STATS))
21292130
.setExcludeTxnFromChangeStreams(excludeTxnFromChangeStreams)
21302131
.setMaxCommitDelay(getConnectionPropertyValue(MAX_COMMIT_DELAY))

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java

+1-10
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,6 @@ class ReadWriteTransaction extends AbstractMultiUseTransaction {
153153
private final SavepointSupport savepointSupport;
154154
private int transactionRetryAttempts;
155155
private int successfulRetries;
156-
private final List<TransactionRetryListener> transactionRetryListeners;
157156
private volatile ApiFuture<TransactionContext> txContextFuture;
158157
private boolean canUseSingleUseRead;
159158
private volatile SettableApiFuture<CommitResponse> commitResponseFuture;
@@ -203,7 +202,6 @@ static class Builder extends AbstractMultiUseTransaction.Builder<Builder, ReadWr
203202
private boolean returnCommitStats;
204203
private Duration maxCommitDelay;
205204
private SavepointSupport savepointSupport;
206-
private List<TransactionRetryListener> transactionRetryListeners;
207205

208206
private Builder() {}
209207

@@ -253,19 +251,13 @@ Builder setSavepointSupport(SavepointSupport savepointSupport) {
253251
return this;
254252
}
255253

256-
Builder setTransactionRetryListeners(List<TransactionRetryListener> listeners) {
257-
Preconditions.checkNotNull(listeners);
258-
this.transactionRetryListeners = listeners;
259-
return this;
260-
}
261-
262254
@Override
263255
ReadWriteTransaction build() {
264256
Preconditions.checkState(dbClient != null, "No DatabaseClient client specified");
265257
Preconditions.checkState(
266258
retryAbortsInternally != null, "RetryAbortsInternally is not specified");
267259
Preconditions.checkState(
268-
transactionRetryListeners != null, "TransactionRetryListeners are not specified");
260+
hasTransactionRetryListeners(), "TransactionRetryListeners are not specified");
269261
Preconditions.checkState(savepointSupport != null, "SavepointSupport is not specified");
270262
return new ReadWriteTransaction(this);
271263
}
@@ -301,7 +293,6 @@ private ReadWriteTransaction(Builder builder) {
301293
this.keepAliveLock = this.keepTransactionAlive ? new ReentrantLock() : null;
302294
this.retryAbortsInternally = builder.retryAbortsInternally;
303295
this.savepointSupport = builder.savepointSupport;
304-
this.transactionRetryListeners = builder.transactionRetryListeners;
305296
this.transactionOptions = extractOptions(builder);
306297
}
307298

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SingleUseTransaction.java

+92-7
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static com.google.cloud.spanner.connection.AbstractStatementParser.RUN_BATCH_STATEMENT;
2121

2222
import com.google.api.core.ApiFuture;
23+
import com.google.api.core.ApiFutureCallback;
2324
import com.google.api.core.ApiFutures;
2425
import com.google.api.core.SettableApiFuture;
2526
import com.google.api.gax.longrunning.OperationFuture;
@@ -42,11 +43,10 @@
4243
import com.google.cloud.spanner.SpannerException;
4344
import com.google.cloud.spanner.SpannerExceptionFactory;
4445
import com.google.cloud.spanner.TimestampBound;
46+
import com.google.cloud.spanner.TransactionMutationLimitExceededException;
4547
import com.google.cloud.spanner.TransactionRunner;
46-
import com.google.cloud.spanner.Type;
4748
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement;
4849
import com.google.cloud.spanner.connection.AbstractStatementParser.StatementType;
49-
import com.google.cloud.spanner.connection.ReadWriteTransaction.Builder;
5050
import com.google.common.base.Preconditions;
5151
import com.google.common.collect.ImmutableList;
5252
import com.google.common.collect.Iterables;
@@ -56,6 +56,7 @@
5656
import io.opentelemetry.context.Scope;
5757
import java.time.Duration;
5858
import java.util.Arrays;
59+
import java.util.UUID;
5960
import java.util.concurrent.Callable;
6061
import javax.annotation.Nonnull;
6162

@@ -219,6 +220,11 @@ public boolean supportsDirectedReads(ParsedStatement parsedStatement) {
219220
return parsedStatement.isQuery();
220221
}
221222

223+
private boolean isRetryDmlAsPartitionedDml() {
224+
return this.autocommitDmlMode
225+
== AutocommitDmlMode.TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC;
226+
}
227+
222228
private void checkAndMarkUsed() {
223229
Preconditions.checkState(!used, "This single-use transaction has already been used");
224230
used = true;
@@ -434,6 +440,7 @@ public ApiFuture<Long> executeUpdateAsync(
434440
ApiFuture<Long> res;
435441
switch (autocommitDmlMode) {
436442
case TRANSACTIONAL:
443+
case TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC:
437444
res =
438445
ApiFutures.transform(
439446
executeTransactionalUpdateAsync(callType, update, AnalyzeMode.NONE, options),
@@ -561,11 +568,89 @@ private ApiFuture<Tuple<Long, ResultSet>> executeTransactionalUpdateAsync(
561568
throw t;
562569
}
563570
};
564-
return executeStatementAsync(
565-
callType,
566-
update,
567-
callable,
568-
ImmutableList.of(SpannerGrpc.getExecuteSqlMethod(), SpannerGrpc.getCommitMethod()));
571+
ApiFuture<Tuple<Long, ResultSet>> transactionalResult =
572+
executeStatementAsync(
573+
callType,
574+
update,
575+
callable,
576+
ImmutableList.of(SpannerGrpc.getExecuteSqlMethod(), SpannerGrpc.getCommitMethod()));
577+
// Retry as Partitioned DML if the statement fails due to exceeding the mutation limit if that
578+
// option has been enabled.
579+
if (isRetryDmlAsPartitionedDml()) {
580+
return addRetryUpdateAsPartitionedDmlCallback(transactionalResult, callType, update, options);
581+
}
582+
return transactionalResult;
583+
}
584+
585+
/**
586+
* Adds a callback to the given future that retries the update statement using Partitioned DML if
587+
* the original statement fails with a {@link TransactionMutationLimitExceededException}.
588+
*/
589+
private ApiFuture<Tuple<Long, ResultSet>> addRetryUpdateAsPartitionedDmlCallback(
590+
ApiFuture<Tuple<Long, ResultSet>> transactionalResult,
591+
CallType callType,
592+
final ParsedStatement update,
593+
final UpdateOption... options) {
594+
// Catch TransactionMutationLimitExceededException and retry as Partitioned DML. All other
595+
// exceptions are just propagated.
596+
return ApiFutures.catchingAsync(
597+
transactionalResult,
598+
TransactionMutationLimitExceededException.class,
599+
mutationLimitExceededException -> {
600+
UUID executionId = UUID.randomUUID();
601+
// Invoke the retryDmlAsPartitionedDmlStarting method for the TransactionRetryListeners
602+
// that have been registered for the connection.
603+
for (TransactionRetryListener listener : this.transactionRetryListeners) {
604+
listener.retryDmlAsPartitionedDmlStarting(
605+
executionId, update.getStatement(), mutationLimitExceededException);
606+
}
607+
// Try to execute the DML statement as Partitioned DML.
608+
ApiFuture<Tuple<Long, ResultSet>> partitionedResult =
609+
ApiFutures.transform(
610+
executePartitionedUpdateAsync(callType, update, options),
611+
lowerBoundUpdateCount -> Tuple.of(lowerBoundUpdateCount, null),
612+
MoreExecutors.directExecutor());
613+
614+
// Add a callback to the future that invokes the TransactionRetryListeners after the
615+
// Partitioned DML statement finished. This will invoke either the Finished or Failed
616+
// method on the listeners.
617+
ApiFutures.addCallback(
618+
partitionedResult,
619+
new ApiFutureCallback<Tuple<Long, ResultSet>>() {
620+
@Override
621+
public void onFailure(Throwable throwable) {
622+
for (TransactionRetryListener listener :
623+
SingleUseTransaction.this.transactionRetryListeners) {
624+
listener.retryDmlAsPartitionedDmlFailed(
625+
executionId, update.getStatement(), throwable);
626+
}
627+
}
628+
629+
@Override
630+
public void onSuccess(Tuple<Long, ResultSet> result) {
631+
for (TransactionRetryListener listener :
632+
SingleUseTransaction.this.transactionRetryListeners) {
633+
listener.retryDmlAsPartitionedDmlFinished(
634+
executionId, update.getStatement(), result.x());
635+
}
636+
}
637+
},
638+
MoreExecutors.directExecutor());
639+
640+
// Catch any exception from the Partitioned DML execution and throw the original
641+
// TransactionMutationLimitExceededException instead.
642+
// The exception that is returned for the Partitioned DML statement is added to the
643+
// exception as a suppressed exception.
644+
return ApiFutures.catching(
645+
partitionedResult,
646+
Throwable.class,
647+
input -> {
648+
mutationLimitExceededException.addSuppressed(input);
649+
throw mutationLimitExceededException;
650+
},
651+
MoreExecutors.directExecutor());
652+
},
653+
MoreExecutors.directExecutor());
569654
}
570655

571656
private ApiFuture<ResultSet> analyzeTransactionalUpdateAsync(

0 commit comments

Comments
 (0)