Skip to content

Commit 6154008

Browse files
authored
fix: fix aborted handling of batchUpdateAsync (#421)
1 parent b35304e commit 6154008

File tree

2 files changed

+56
-10
lines changed

2 files changed

+56
-10
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

+15-10
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,7 @@ public ApiFuture<long[]> batchUpdateAsync(Iterable<Statement> statements) {
540540
decreaseAsyncOperations();
541541
throw t;
542542
}
543-
final ApiFuture<long[]> updateCounts =
543+
ApiFuture<long[]> updateCounts =
544544
ApiFutures.transform(
545545
response,
546546
new ApiFunction<ExecuteBatchDmlResponse, long[]>() {
@@ -565,19 +565,24 @@ public long[] apply(ExecuteBatchDmlResponse input) {
565565
}
566566
},
567567
MoreExecutors.directExecutor());
568+
updateCounts =
569+
ApiFutures.catching(
570+
updateCounts,
571+
Throwable.class,
572+
new ApiFunction<Throwable, long[]>() {
573+
@Override
574+
public long[] apply(Throwable input) {
575+
SpannerException e = SpannerExceptionFactory.newSpannerException(input);
576+
onError(e);
577+
throw e;
578+
}
579+
},
580+
MoreExecutors.directExecutor());
568581
updateCounts.addListener(
569582
new Runnable() {
570583
@Override
571584
public void run() {
572-
try {
573-
updateCounts.get();
574-
} catch (ExecutionException e) {
575-
onError(SpannerExceptionFactory.newSpannerException(e.getCause()));
576-
} catch (InterruptedException e) {
577-
onError(SpannerExceptionFactory.propagateInterrupt(e));
578-
} finally {
579-
decreaseAsyncOperations();
580-
}
585+
decreaseAsyncOperations();
581586
}
582587
},
583588
MoreExecutors.directExecutor());

google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java

+41
Original file line numberDiff line numberDiff line change
@@ -734,6 +734,47 @@ public ApiFuture<long[]> apply(TransactionContext txn, Void input)
734734
CommitRequest.class);
735735
}
736736

737+
@Test
738+
public void asyncTransactionManagerBatchUpdateAbortedBeforeFirstStatement() throws Exception {
739+
final AtomicInteger attempt = new AtomicInteger();
740+
try (AsyncTransactionManager mgr = clientWithEmptySessionPool().transactionManagerAsync()) {
741+
TransactionContextFuture txn = mgr.beginAsync();
742+
while (true) {
743+
try {
744+
txn.then(
745+
new AsyncTransactionFunction<Void, long[]>() {
746+
@Override
747+
public ApiFuture<long[]> apply(TransactionContext txn, Void input)
748+
throws Exception {
749+
if (attempt.incrementAndGet() == 1) {
750+
mockSpanner.abortTransaction(txn);
751+
}
752+
return txn.batchUpdateAsync(
753+
ImmutableList.of(UPDATE_STATEMENT, UPDATE_STATEMENT));
754+
}
755+
},
756+
executor)
757+
.commitAsync()
758+
.get();
759+
break;
760+
} catch (AbortedException e) {
761+
txn = mgr.resetForRetryAsync();
762+
}
763+
}
764+
}
765+
assertThat(attempt.get()).isEqualTo(2);
766+
// There should only be 1 CommitRequest, as the first attempt should abort already after the
767+
// ExecuteBatchDmlRequest.
768+
assertThat(mockSpanner.getRequestTypes())
769+
.containsExactly(
770+
BatchCreateSessionsRequest.class,
771+
BeginTransactionRequest.class,
772+
ExecuteBatchDmlRequest.class,
773+
BeginTransactionRequest.class,
774+
ExecuteBatchDmlRequest.class,
775+
CommitRequest.class);
776+
}
777+
737778
@Test
738779
public void asyncTransactionManagerWithBatchUpdateCommitAborted() throws Exception {
739780
try (AsyncTransactionManager mgr = clientWithEmptySessionPool().transactionManagerAsync()) {

0 commit comments

Comments
 (0)