From 46e0d3e8dfd55cdf432556d3ff02449c6cf819bd Mon Sep 17 00:00:00 2001 From: Wei Deng Date: Tue, 19 Mar 2024 13:05:02 -0700 Subject: [PATCH 1/6] Add support for transaction-level exclusion from change streams --- .../com/google/cloud/spanner/Options.java | 40 +++ .../spanner/PartitionedDmlTransaction.java | 8 +- .../com/google/cloud/spanner/SessionImpl.java | 25 +- .../cloud/spanner/TransactionRunnerImpl.java | 4 +- .../cloud/spanner/DatabaseClientImplTest.java | 264 ++++++++++++++++++ .../com/google/cloud/spanner/OptionsTest.java | 38 +++ 6 files changed, 370 insertions(+), 9 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java index 76d0f24225a..4819e8bb1f7 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java @@ -61,6 +61,9 @@ public interface ReadOption {} public interface ReadQueryUpdateTransactionOption extends ReadOption, QueryOption, UpdateOption, TransactionOption {} + /** Marker interface to mark options applicable to Update and Write operations */ + public interface UpdateTransactionOption extends UpdateOption, TransactionOption {} + /** * Marker interface to mark options applicable to Create, Update and Delete operations in admin * API. @@ -108,6 +111,17 @@ public static TransactionOption commitStats() { public static TransactionOption optimisticLock() { return OPTIMISTIC_LOCK_OPTION; } + + /** + * Specifying this instructs the transaction to be excluded from being recorded in change streams + * with the DDL option `allow_txn_exclusion=true`. This does not exclude the transaction from + * being recorded in the change streams with the DDL option `allow_txn_exclusion` being false or + * unset. + */ + public static UpdateTransactionOption excludeTxnFromChangeStreams() { + return EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION; + } + /** * Specifying this will cause the read to yield at most this many rows. This should be greater * than 0. @@ -282,6 +296,18 @@ void appendToOptions(Options options) { static final OptimisticLockOption OPTIMISTIC_LOCK_OPTION = new OptimisticLockOption(); + /** Option to request the transaction to be excluded from change streams. */ + static final class ExcludeTxnFromChangeStreamsOption extends InternalOption + implements UpdateTransactionOption { + @Override + void appendToOptions(Options options) { + options.withExcludeTxnFromChangeStreams = true; + } + } + + static final ExcludeTxnFromChangeStreamsOption EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION = + new ExcludeTxnFromChangeStreamsOption(); + /** Option pertaining to flow control. */ static final class FlowControlOption extends InternalOption implements ReadAndQueryOption { final int prefetchChunks; @@ -406,6 +432,7 @@ void appendToOptions(Options options) { private String etag; private Boolean validateOnly; private Boolean withOptimisticLock; + private Boolean withExcludeTxnFromChangeStreams; private Boolean dataBoostEnabled; private DirectedReadOptions directedReadOptions; private DecodeMode decodeMode; @@ -509,6 +536,10 @@ Boolean withOptimisticLock() { return withOptimisticLock; } + Boolean withExcludeTxnFromChangeStreams() { + return withExcludeTxnFromChangeStreams; + } + boolean hasDataBoostEnabled() { return dataBoostEnabled != null; } @@ -572,6 +603,11 @@ public String toString() { if (withOptimisticLock != null) { b.append("withOptimisticLock: ").append(withOptimisticLock).append(' '); } + if (withExcludeTxnFromChangeStreams != null) { + b.append("withExcludeTxnFromChangeStreams: ") + .append(withExcludeTxnFromChangeStreams) + .append(' '); + } if (dataBoostEnabled != null) { b.append("dataBoostEnabled: ").append(dataBoostEnabled).append(' '); } @@ -617,6 +653,7 @@ public boolean equals(Object o) { && Objects.equals(etag(), that.etag()) && Objects.equals(validateOnly(), that.validateOnly()) && Objects.equals(withOptimisticLock(), that.withOptimisticLock()) + && Objects.equals(withExcludeTxnFromChangeStreams(), that.withExcludeTxnFromChangeStreams()) && Objects.equals(dataBoostEnabled(), that.dataBoostEnabled()) && Objects.equals(directedReadOptions(), that.directedReadOptions()); } @@ -663,6 +700,9 @@ public int hashCode() { if (withOptimisticLock != null) { result = 31 * result + withOptimisticLock.hashCode(); } + if (withExcludeTxnFromChangeStreams != null) { + result = 31 * result + withExcludeTxnFromChangeStreams.hashCode(); + } if (dataBoostEnabled != null) { result = 31 * result + dataBoostEnabled.hashCode(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java index cabc270566c..d498bb232a1 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java @@ -167,7 +167,7 @@ private ExecuteSqlRequest resumeOrRestartRequest( @VisibleForTesting ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Options options) { - ByteString transactionId = initTransaction(); + ByteString transactionId = initTransaction(options); final TransactionSelector transactionSelector = TransactionSelector.newBuilder().setId(transactionId).build(); @@ -195,13 +195,15 @@ ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Opt return builder.build(); } - private ByteString initTransaction() { + private ByteString initTransaction(final Options options) { final BeginTransactionRequest request = BeginTransactionRequest.newBuilder() .setSession(session.getName()) .setOptions( TransactionOptions.newBuilder() - .setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance())) + .setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance()) + .setExcludeTxnFromChangeStreams( + options.withExcludeTxnFromChangeStreams() == Boolean.TRUE)) .build(); Transaction tx = rpc.beginTransaction(request, session.getOptions(), true); if (tx.getId().isEmpty()) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 81b00001105..83ee2812721 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -69,11 +69,16 @@ static void throwIfTransactionsPending() { } static TransactionOptions createReadWriteTransactionOptions(Options options) { + TransactionOptions.Builder transactionOptions = TransactionOptions.newBuilder(); + if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) { + transactionOptions.setExcludeTxnFromChangeStreams(true); + } TransactionOptions.ReadWrite.Builder readWrite = TransactionOptions.ReadWrite.newBuilder(); if (options.withOptimisticLock() == Boolean.TRUE) { readWrite.setReadLockMode(TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC); } - return TransactionOptions.newBuilder().setReadWrite(readWrite).build(); + transactionOptions.setReadWrite(readWrite); + return transactionOptions.build(); } /** @@ -181,10 +186,16 @@ public CommitResponse writeAtLeastOnceWithOptions( CommitRequest.newBuilder() .setSession(name) .setReturnCommitStats(options.withCommitStats()) - .addAllMutations(mutationsProto) - .setSingleUseTransaction( - TransactionOptions.newBuilder() - .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())); + .addAllMutations(mutationsProto); + + TransactionOptions.Builder transactionOptionsBuilder = + TransactionOptions.newBuilder() + .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()); + if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) { + transactionOptionsBuilder.setExcludeTxnFromChangeStreams(true); + } + requestBuilder.setSingleUseTransaction(transactionOptionsBuilder); + if (options.hasMaxCommitDelay()) { requestBuilder.setMaxCommitDelay( Duration.newBuilder() @@ -238,6 +249,10 @@ public ServerStream batchWriteAtLeastOnce( if (batchWriteRequestOptions != null) { requestBuilder.setRequestOptions(batchWriteRequestOptions); } + if (Options.fromTransactionOptions(transactionOptions).withExcludeTxnFromChangeStreams() + == Boolean.TRUE) { + requestBuilder.setExcludeTxnFromChangeStreams(true); + } ISpan span = tracer.spanBuilder(SpannerImpl.BATCH_WRITE); try (IScope s = tracer.withSpan(span)) { return spanner.getRpc().batchWriteAtLeastOnce(requestBuilder.build(), this.options); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 3249be1bdb3..3551338b6d3 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -371,7 +371,9 @@ public void run() { if (transactionId == null && transactionIdFuture == null) { requestBuilder.setSingleUseTransaction( TransactionOptions.newBuilder() - .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())); + .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()) + .setExcludeTxnFromChangeStreams( + options.withExcludeTxnFromChangeStreams() == Boolean.TRUE)); } else { requestBuilder.setTransactionId( transactionId == null diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 7cba80edd8d..9d72d6ff0a1 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -75,6 +75,7 @@ import com.google.rpc.RetryInfo; import com.google.spanner.v1.BatchWriteRequest; import com.google.spanner.v1.BatchWriteResponse; +import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.DeleteSessionRequest; import com.google.spanner.v1.DirectedReadOptions; @@ -1334,6 +1335,14 @@ public void testWrite() { Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build())); assertNotNull(timestamp); + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasReadWrite()); + assertFalse(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(commitRequests).hasSize(1); CommitRequest commit = commitRequests.get(0); @@ -1388,6 +1397,14 @@ public void testWriteWithOptions() { Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), Options.priority(RpcPriority.HIGH)); + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasReadWrite()); + assertFalse(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + List commits = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(commits).hasSize(1); CommitRequest commit = commits.get(0); @@ -1409,6 +1426,24 @@ public void testWriteWithCommitStats() { assertNotNull(response.getCommitStats()); } + @Test + public void testWriteWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + client.writeWithOptions( + Collections.singletonList( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), + Options.excludeTxnFromChangeStreams()); + + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasReadWrite()); + assertTrue(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + } + @Test public void testWriteAtLeastOnce() { DatabaseClient client = @@ -1418,6 +1453,15 @@ public void testWriteAtLeastOnce() { Collections.singletonList( Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build())); assertNotNull(timestamp); + + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(commitRequests).hasSize(1); + CommitRequest commit = commitRequests.get(0); + assertNotNull(commit.getSingleUseTransaction()); + assertTrue(commit.getSingleUseTransaction().hasReadWrite()); + assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); + assertNotNull(commit.getRequestOptions()); + assertEquals(Priority.PRIORITY_UNSPECIFIED, commit.getRequestOptions().getPriority()); } @Test @@ -1438,6 +1482,7 @@ public void testWriteAtLeastOnceWithCommitStats() { CommitRequest commit = commitRequests.get(0); assertNotNull(commit.getSingleUseTransaction()); assertTrue(commit.getSingleUseTransaction().hasReadWrite()); + assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); assertNotNull(commit.getRequestOptions()); assertEquals(Priority.PRIORITY_UNSPECIFIED, commit.getRequestOptions().getPriority()); } @@ -1456,6 +1501,7 @@ public void testWriteAtLeastOnceWithOptions() { CommitRequest commit = commitRequests.get(0); assertNotNull(commit.getSingleUseTransaction()); assertTrue(commit.getSingleUseTransaction().hasReadWrite()); + assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); assertNotNull(commit.getRequestOptions()); assertEquals(Priority.PRIORITY_LOW, commit.getRequestOptions().getPriority()); } @@ -1474,11 +1520,31 @@ public void testWriteAtLeastOnceWithTagOptions() { CommitRequest commit = commitRequests.get(0); assertNotNull(commit.getSingleUseTransaction()); assertTrue(commit.getSingleUseTransaction().hasReadWrite()); + assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); assertNotNull(commit.getRequestOptions()); assertThat(commit.getRequestOptions().getTransactionTag()).isEqualTo("app=spanner,env=test"); assertThat(commit.getRequestOptions().getRequestTag()).isEmpty(); } + @Test + public void testWriteAtLeastOnceWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + client.writeAtLeastOnceWithOptions( + Collections.singletonList( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), + Options.excludeTxnFromChangeStreams()); + + System.out.println(mockSpanner.getRequests()); + + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(commitRequests).hasSize(1); + CommitRequest commit = commitRequests.get(0); + assertNotNull(commit.getSingleUseTransaction()); + assertTrue(commit.getSingleUseTransaction().hasReadWrite()); + assertTrue(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); + } + @Test public void testBatchWriteAtLeastOnceWithoutOptions() { DatabaseClient client = @@ -1500,6 +1566,7 @@ public void testBatchWriteAtLeastOnceWithoutOptions() { BatchWriteRequest request = requests.get(0); assertEquals(request.getMutationGroupsCount(), 4); assertEquals(request.getRequestOptions().getPriority(), Priority.PRIORITY_UNSPECIFIED); + assertFalse(request.getExcludeTxnFromChangeStreams()); } @Test @@ -1514,6 +1581,7 @@ public void testBatchWriteAtLeastOnceWithOptions() { BatchWriteRequest request = requests.get(0); assertEquals(request.getMutationGroupsCount(), 4); assertEquals(request.getRequestOptions().getPriority(), Priority.PRIORITY_LOW); + assertFalse(request.getExcludeTxnFromChangeStreams()); } @Test @@ -1529,6 +1597,21 @@ public void testBatchWriteAtLeastOnceWithTagOptions() { assertEquals(request.getMutationGroupsCount(), 4); assertEquals(request.getRequestOptions().getTransactionTag(), "app=spanner,env=test"); assertThat(request.getRequestOptions().getRequestTag()).isEmpty(); + assertFalse(request.getExcludeTxnFromChangeStreams()); + } + + @Test + public void testBatchWriteAtLeastOnceWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + consumeBatchWriteStream( + client.batchWriteAtLeastOnce(MUTATION_GROUPS, Options.excludeTxnFromChangeStreams())); + + List requests = mockSpanner.getRequestsOfType(BatchWriteRequest.class); + assertEquals(requests.size(), 1); + BatchWriteRequest request = requests.get(0); + assertEquals(request.getMutationGroupsCount(), 4); + assertTrue(request.getExcludeTxnFromChangeStreams()); } @Test @@ -1782,6 +1865,9 @@ public void testExecuteUpdateWithTag() { assertThat(request.getRequestOptions().getRequestTag()) .isEqualTo("app=spanner,env=test,action=update"); assertThat(request.getRequestOptions().getTransactionTag()).isEmpty(); + assertNotNull(request.getTransaction().getBegin()); + assertTrue(request.getTransaction().getBegin().hasReadWrite()); + assertFalse(request.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); } @Test @@ -1805,6 +1891,9 @@ public void testBatchUpdateWithTag() { .isEqualTo("app=spanner,env=test,action=batch"); assertThat(request.getRequestOptions().getTransactionTag()) .isEqualTo("app=spanner,env=test,action=txn"); + assertNotNull(request.getTransaction().getBegin()); + assertTrue(request.getTransaction().getBegin().hasReadWrite()); + assertFalse(request.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); } @Test @@ -1814,6 +1903,14 @@ public void testPartitionedDMLWithTag() { client.executePartitionedUpdate( UPDATE_STATEMENT, Options.tag("app=spanner,env=test,action=dml")); + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasPartitionedDml()); + assertFalse(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); assertThat(requests).hasSize(1); ExecuteSqlRequest request = requests.get(0); @@ -1835,6 +1932,14 @@ public void testCommitWithTag() { return null; }); + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasReadWrite()); + assertFalse(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + List requests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(requests).hasSize(1); CommitRequest request = requests.get(0); @@ -1855,6 +1960,14 @@ public void testTransactionManagerCommitWithTag() { manager.commit(); } + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasReadWrite()); + assertFalse(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + List requests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(requests).hasSize(1); CommitRequest request = requests.get(0); @@ -1877,6 +1990,14 @@ public void testAsyncRunnerCommitWithTag() { }, executor)); + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasReadWrite()); + assertFalse(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + List requests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(requests).hasSize(1); CommitRequest request = requests.get(0); @@ -1904,6 +2025,14 @@ public void testAsyncTransactionManagerCommitWithTag() { .commitAsync()); } + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasReadWrite()); + assertFalse(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + List requests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(requests).hasSize(1); CommitRequest request = requests.get(0); @@ -1913,6 +2042,141 @@ public void testAsyncTransactionManagerCommitWithTag() { .isEqualTo("app=spanner,env=test,action=manager"); } + @Test + public void testExecuteUpdateWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionRunner runner = client.readWriteTransaction(Options.excludeTxnFromChangeStreams()); + runner.run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT)); + + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertThat(requests).hasSize(1); + ExecuteSqlRequest request = requests.get(0); + assertNotNull(request.getTransaction().getBegin()); + assertTrue(request.getTransaction().getBegin().hasReadWrite()); + assertTrue(request.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); + } + + @Test + public void testBatchUpdateWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionRunner runner = client.readWriteTransaction(Options.excludeTxnFromChangeStreams()); + runner.run(transaction -> transaction.batchUpdate(Collections.singletonList(UPDATE_STATEMENT))); + + List requests = + mockSpanner.getRequestsOfType(ExecuteBatchDmlRequest.class); + assertThat(requests).hasSize(1); + ExecuteBatchDmlRequest request = requests.get(0); + assertNotNull(request.getTransaction().getBegin()); + assertTrue(request.getTransaction().getBegin().hasReadWrite()); + assertTrue(request.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); + } + + @Test + public void testPartitionedDMLWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + client.executePartitionedUpdate(UPDATE_STATEMENT, Options.excludeTxnFromChangeStreams()); + + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasPartitionedDml()); + assertTrue(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + } + + @Test + public void testCommitWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionRunner runner = client.readWriteTransaction(Options.excludeTxnFromChangeStreams()); + runner.run( + transaction -> { + transaction.buffer(Mutation.delete("TEST", KeySet.all())); + return null; + }); + + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasReadWrite()); + assertTrue(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + } + + @Test + public void testTransactionManagerCommitWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + try (TransactionManager manager = + client.transactionManager(Options.excludeTxnFromChangeStreams())) { + TransactionContext transaction = manager.begin(); + transaction.buffer(Mutation.delete("TEST", KeySet.all())); + manager.commit(); + } + + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasReadWrite()); + assertTrue(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + } + + @Test + public void testAsyncRunnerCommitWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + AsyncRunner runner = client.runAsync(Options.excludeTxnFromChangeStreams()); + get( + runner.runAsync( + txn -> { + txn.buffer(Mutation.delete("TEST", KeySet.all())); + return ApiFutures.immediateFuture(null); + }, + executor)); + + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasReadWrite()); + assertTrue(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + } + + @Test + public void testAsyncTransactionManagerCommitWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + try (AsyncTransactionManager manager = + client.transactionManagerAsync(Options.excludeTxnFromChangeStreams())) { + TransactionContextFuture transaction = manager.beginAsync(); + get( + transaction + .then( + (txn, input) -> { + txn.buffer(Mutation.delete("TEST", KeySet.all())); + return ApiFutures.immediateFuture(null); + }, + executor) + .commitAsync()); + } + + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasReadWrite()); + assertTrue(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + } + @Test public void singleUse() { DatabaseClientImpl client = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java index e0bbf81f297..8c9a5d957e8 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -100,6 +101,7 @@ public void allOptionsAbsent() { assertThat(options.hasTag()).isFalse(); assertThat(options.hasDataBoostEnabled()).isFalse(); assertThat(options.hasDirectedReadOptions()).isFalse(); + assertNull(options.withExcludeTxnFromChangeStreams()); assertThat(options.toString()).isEqualTo(""); assertThat(options.equals(options)).isTrue(); assertThat(options.equals(null)).isFalse(); @@ -691,4 +693,40 @@ public void directedReadHashCode() { public void directedReadsNullNotAllowed() { assertThrows(NullPointerException.class, () -> Options.directedRead(null)); } + + @Test + public void transactionOptionsExcludeTxnFromChangeStreams() { + Options option1 = Options.fromTransactionOptions(Options.excludeTxnFromChangeStreams()); + Options option2 = Options.fromTransactionOptions(Options.excludeTxnFromChangeStreams()); + Options option3 = Options.fromTransactionOptions(); + + assertEquals(option1, option2); + assertEquals(option1.hashCode(), option2.hashCode()); + assertNotEquals(option1, option3); + assertNotEquals(option1.hashCode(), option3.hashCode()); + + assertTrue(option1.withExcludeTxnFromChangeStreams()); + assertThat(option1.toString()).contains("withExcludeTxnFromChangeStreams: true"); + + assertNull(option3.withExcludeTxnFromChangeStreams()); + assertThat(option3.toString()).doesNotContain("withExcludeTxnFromChangeStreams: true"); + } + + @Test + public void updateOptionsExcludeTxnFromChangeStreams() { + Options option1 = Options.fromUpdateOptions(Options.excludeTxnFromChangeStreams()); + Options option2 = Options.fromUpdateOptions(Options.excludeTxnFromChangeStreams()); + Options option3 = Options.fromUpdateOptions(); + + assertEquals(option1, option2); + assertEquals(option1.hashCode(), option2.hashCode()); + assertNotEquals(option1, option3); + assertNotEquals(option1.hashCode(), option3.hashCode()); + + assertTrue(option1.withExcludeTxnFromChangeStreams()); + assertThat(option1.toString()).contains("withExcludeTxnFromChangeStreams: true"); + + assertNull(option3.withExcludeTxnFromChangeStreams()); + assertThat(option3.toString()).doesNotContain("withExcludeTxnFromChangeStreams: true"); + } } From aa7bc0cc1ae0bee0070733abc942e05d7371b22a Mon Sep 17 00:00:00 2001 From: Wei Deng Date: Tue, 19 Mar 2024 13:52:14 -0700 Subject: [PATCH 2/6] cleanup --- .../java/com/google/cloud/spanner/DatabaseClientImplTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 9d72d6ff0a1..4feac25e3ed 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -1535,8 +1535,6 @@ public void testWriteAtLeastOnceWithExcludeTxnFromChangeStreams() { Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), Options.excludeTxnFromChangeStreams()); - System.out.println(mockSpanner.getRequests()); - List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(commitRequests).hasSize(1); CommitRequest commit = commitRequests.get(0); From 96b508b50c633bfc58cc20c1b47649bf91ff68aa Mon Sep 17 00:00:00 2001 From: Wei Deng Date: Thu, 21 Mar 2024 12:06:18 -0700 Subject: [PATCH 3/6] refactor: introduce PartitionedUpdateOption --- .../clirr-ignored-differences.xml | 7 ++ .../google/cloud/spanner/DatabaseClient.java | 3 +- .../cloud/spanner/DatabaseClientImpl.java | 5 +- .../com/google/cloud/spanner/Options.java | 26 +++++-- .../spanner/PartitionedDmlTransaction.java | 8 ++- .../com/google/cloud/spanner/SessionImpl.java | 4 +- .../com/google/cloud/spanner/SessionPool.java | 5 +- .../com/google/cloud/spanner/OptionsTest.java | 67 ++++++++++++++++--- 8 files changed, 101 insertions(+), 24 deletions(-) diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index 5d3beb2e7b2..f7893cd83d4 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -631,4 +631,11 @@ com/google/cloud/spanner/connection/Connection void setDirectedRead(com.google.spanner.v1.DirectedReadOptions) + + + 7005 + com/google/cloud/spanner/DatabaseClient + long executePartitionedUpdate(com.google.cloud.spanner.Statement, com.google.cloud.spanner.Options$UpdateOption[]) + long executePartitionedUpdate(com.google.cloud.spanner.Statement, com.google.cloud.spanner.Options$PartitionedUpdateOption[]) + diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java index 06237131458..68cc604cfd8 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java @@ -18,6 +18,7 @@ import com.google.api.gax.rpc.ServerStream; import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Options.PartitionedUpdateOption; import com.google.cloud.spanner.Options.RpcPriority; import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; @@ -600,5 +601,5 @@ ServerStream batchWriteAtLeastOnce( *

Given the above, Partitioned DML is good fit for large, database-wide, operations that are * idempotent, such as deleting old rows from a very large table. */ - long executePartitionedUpdate(Statement stmt, UpdateOption... options); + long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index b63ad379305..f19d492b492 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -18,8 +18,8 @@ import com.google.api.gax.rpc.ServerStream; import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Options.PartitionedUpdateOption; import com.google.cloud.spanner.Options.TransactionOption; -import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionPool.PooledSessionFuture; import com.google.cloud.spanner.SpannerImpl.ClosedException; import com.google.common.annotations.VisibleForTesting; @@ -240,7 +240,8 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti } @Override - public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) { + public long executePartitionedUpdate( + final Statement stmt, final PartitionedUpdateOption... options) { ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION); try (IScope s = tracer.withSpan(span)) { return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options)); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java index 3dbd0c1cda3..ca60eea4c96 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java @@ -61,9 +61,6 @@ public interface ReadOption {} public interface ReadQueryUpdateTransactionOption extends ReadOption, QueryOption, UpdateOption, TransactionOption {} - /** Marker interface to mark options applicable to Update and Write operations */ - public interface UpdateTransactionOption extends UpdateOption, TransactionOption {} - /** * Marker interface to mark options applicable to Create, Update and Delete operations in admin * API. @@ -86,8 +83,15 @@ public interface QueryOption {} /** Marker interface to mark options applicable to write operations */ public interface TransactionOption {} + /** Marker interface to mark options applicable to partitioned update */ + public interface PartitionedUpdateOption {} + /** Marker interface to mark options applicable to update operation. */ - public interface UpdateOption {} + public interface UpdateOption extends PartitionedUpdateOption {} + + /** Marker interface to mark options applicable to partitioned update and write operations */ + public interface PartitionedUpdateTransactionOption + extends PartitionedUpdateOption, TransactionOption {} /** Marker interface to mark options applicable to list operations in admin API. */ public interface ListOption {} @@ -118,7 +122,7 @@ public static TransactionOption optimisticLock() { * being recorded in the change streams with the DDL option `allow_txn_exclusion` being false or * unset. */ - public static UpdateTransactionOption excludeTxnFromChangeStreams() { + public static PartitionedUpdateTransactionOption excludeTxnFromChangeStreams() { return EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION; } @@ -297,7 +301,7 @@ void appendToOptions(Options options) { /** Option to request the transaction to be excluded from change streams. */ static final class ExcludeTxnFromChangeStreamsOption extends InternalOption - implements UpdateTransactionOption { + implements PartitionedUpdateTransactionOption { @Override void appendToOptions(Options options) { options.withExcludeTxnFromChangeStreams = true; @@ -744,6 +748,16 @@ static Options fromUpdateOptions(UpdateOption... options) { return updateOptions; } + static Options fromPartitinoedUpdateOptions(PartitionedUpdateOption... options) { + Options partitionedUpdateOptions = new Options(); + for (PartitionedUpdateOption option : options) { + if (option instanceof InternalOption) { + ((InternalOption) option).appendToOptions(partitionedUpdateOptions); + } + } + return partitionedUpdateOptions; + } + static Options fromTransactionOptions(TransactionOption... options) { Options transactionOptions = new Options(); for (TransactionOption option : options) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java index d498bb232a1..c811cedb845 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java @@ -25,7 +25,7 @@ import com.google.api.gax.rpc.InternalException; import com.google.api.gax.rpc.ServerStream; import com.google.api.gax.rpc.UnavailableException; -import com.google.cloud.spanner.Options.UpdateOption; +import com.google.cloud.spanner.Options.PartitionedUpdateOption; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; @@ -71,7 +71,9 @@ public class PartitionedDmlTransaction implements SessionImpl.SessionTransaction * last seen resume token if the server returns any. */ long executeStreamingPartitionedUpdate( - final Statement statement, final Duration timeout, final UpdateOption... updateOptions) { + final Statement statement, + final Duration timeout, + final PartitionedUpdateOption... partitionedUpdateOptions) { checkState(isValid, "Partitioned DML has been invalidated by a new operation on the session"); LOGGER.log(Level.FINER, "Starting PartitionedUpdate statement"); @@ -79,7 +81,7 @@ long executeStreamingPartitionedUpdate( boolean foundStats = false; long updateCount = 0L; Stopwatch stopwatch = Stopwatch.createStarted(ticker); - Options options = Options.fromUpdateOptions(updateOptions); + Options options = Options.fromPartitinoedUpdateOptions(partitionedUpdateOptions); try { ExecuteSqlRequest request = newTransactionRequestFrom(statement, options); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 83ee2812721..215b16f5bfb 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -26,8 +26,8 @@ import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction; import com.google.cloud.spanner.AbstractReadContext.SingleReadContext; import com.google.cloud.spanner.AbstractReadContext.SingleUseReadOnlyTransaction; +import com.google.cloud.spanner.Options.PartitionedUpdateOption; import com.google.cloud.spanner.Options.TransactionOption; -import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionClient.SessionId; import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; @@ -140,7 +140,7 @@ void markUsed(Instant instant) { } @Override - public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { + public long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options) { setActive(null); PartitionedDmlTransaction txn = new PartitionedDmlTransaction(this, spanner.getRpc(), Ticker.systemTicker()); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 8058802a8fc..ec58a728c04 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -52,6 +52,7 @@ import com.google.cloud.Tuple; import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; +import com.google.cloud.spanner.Options.PartitionedUpdateOption; import com.google.cloud.spanner.Options.QueryOption; import com.google.cloud.spanner.Options.ReadOption; import com.google.cloud.spanner.Options.TransactionOption; @@ -1270,7 +1271,7 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti } @Override - public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { + public long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options) { try { return get(true).executePartitionedUpdate(stmt, options); } finally { @@ -1470,7 +1471,7 @@ public ServerStream batchWriteAtLeastOnce( } @Override - public long executePartitionedUpdate(Statement stmt, UpdateOption... options) + public long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options) throws SpannerException { try { markUsed(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java index 8c9a5d957e8..5c223c80eb5 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java @@ -460,6 +460,37 @@ public void testUpdateOptionsWithPriorityHashCode() { assertNotEquals(optionsWithHighPriority1.hashCode(), optionsWithMediumPriority.hashCode()); } + @Test + public void testPartitionedUpdateOptions() { + Options option1 = Options.fromPartitinoedUpdateOptions(); + Options option2 = Options.fromPartitinoedUpdateOptions(); + assertEquals(option1, option2); + assertEquals(option1.hashCode(), option2.hashCode()); + assertEquals(option1.toString(), option2.toString()); + assertEquals("", option1.toString()); + } + + @Test + public void testPartitionedUpdateOptionsWithPriority() { + Options optionsWithHighPriority1 = + Options.fromPartitinoedUpdateOptions(Options.priority(RpcPriority.HIGH)); + assertEquals(Priority.PRIORITY_HIGH, optionsWithHighPriority1.priority()); + assertEquals("priority: HIGH ", optionsWithHighPriority1.toString()); + + Options optionsWithHighPriority2 = + Options.fromPartitinoedUpdateOptions(Options.priority(RpcPriority.HIGH)); + assertEquals(optionsWithHighPriority1, optionsWithHighPriority2); + assertEquals(optionsWithHighPriority1.hashCode(), optionsWithHighPriority2.hashCode()); + assertEquals(optionsWithHighPriority1.toString(), optionsWithHighPriority2.toString()); + + Options optionsWithMediumPriority = + Options.fromPartitinoedUpdateOptions(Options.priority(RpcPriority.MEDIUM)); + assertEquals(Priority.PRIORITY_MEDIUM, optionsWithMediumPriority.priority()); + assertEquals("priority: MEDIUM ", optionsWithMediumPriority.toString()); + assertNotEquals(optionsWithHighPriority1, optionsWithMediumPriority); + assertNotEquals(optionsWithHighPriority1.hashCode(), optionsWithMediumPriority.hashCode()); + } + @Test public void testQueryOptionsEquality() { Options option1 = Options.fromQueryOptions(); @@ -617,6 +648,26 @@ public void updateEquality() { assertThat(o2.equals(o3)).isFalse(); } + @Test + public void partitionedUpdateWithTag() { + String tag1 = "app=spanner,env=test"; + Options o1 = Options.fromPartitinoedUpdateOptions(Options.tag(tag1)); + assertEquals(tag1, o1.tag()); + assertEquals("tag: " + tag1 + " ", o1.toString()); + + Options o2 = Options.fromPartitinoedUpdateOptions(Options.tag(tag1)); + assertEquals(o1, o2); + assertEquals(o1.hashCode(), o2.hashCode()); + assertEquals(o1.toString(), o2.toString()); + + String tag2 = "app=spanner,env=stage"; + Options o3 = Options.fromPartitinoedUpdateOptions(Options.tag(tag2)); + assertEquals("tag: " + tag2 + " ", o3.toString()); + assertNotEquals(o2, o3); + assertNotEquals(o2.hashCode(), o3.hashCode()); + assertNotEquals(o2.toString(), o3.toString()); + } + @Test public void transactionOptionsTest() { String tag = "app=spanner,env=test"; @@ -706,17 +757,17 @@ public void transactionOptionsExcludeTxnFromChangeStreams() { assertNotEquals(option1.hashCode(), option3.hashCode()); assertTrue(option1.withExcludeTxnFromChangeStreams()); - assertThat(option1.toString()).contains("withExcludeTxnFromChangeStreams: true"); + assertEquals("withExcludeTxnFromChangeStreams: true ", option1.toString()); assertNull(option3.withExcludeTxnFromChangeStreams()); - assertThat(option3.toString()).doesNotContain("withExcludeTxnFromChangeStreams: true"); + assertEquals("", option3.toString()); } @Test - public void updateOptionsExcludeTxnFromChangeStreams() { - Options option1 = Options.fromUpdateOptions(Options.excludeTxnFromChangeStreams()); - Options option2 = Options.fromUpdateOptions(Options.excludeTxnFromChangeStreams()); - Options option3 = Options.fromUpdateOptions(); + public void partitionedUpdateOptionsExcludeTxnFromChangeStreams() { + Options option1 = Options.fromPartitinoedUpdateOptions(Options.excludeTxnFromChangeStreams()); + Options option2 = Options.fromPartitinoedUpdateOptions(Options.excludeTxnFromChangeStreams()); + Options option3 = Options.fromPartitinoedUpdateOptions(); assertEquals(option1, option2); assertEquals(option1.hashCode(), option2.hashCode()); @@ -724,9 +775,9 @@ public void updateOptionsExcludeTxnFromChangeStreams() { assertNotEquals(option1.hashCode(), option3.hashCode()); assertTrue(option1.withExcludeTxnFromChangeStreams()); - assertThat(option1.toString()).contains("withExcludeTxnFromChangeStreams: true"); + assertEquals("withExcludeTxnFromChangeStreams: true ", option1.toString()); assertNull(option3.withExcludeTxnFromChangeStreams()); - assertThat(option3.toString()).doesNotContain("withExcludeTxnFromChangeStreams: true"); + assertEquals("", option3.toString()); } } From 3f9c0f3fee3e3303c03a3daf3962cc62330a16ab Mon Sep 17 00:00:00 2001 From: Wei Deng Date: Mon, 25 Mar 2024 15:41:51 -0700 Subject: [PATCH 4/6] Revert "refactor: introduce PartitionedUpdateOption" This reverts commit 96b508b50c633bfc58cc20c1b47649bf91ff68aa. --- .../clirr-ignored-differences.xml | 7 -- .../google/cloud/spanner/DatabaseClient.java | 3 +- .../cloud/spanner/DatabaseClientImpl.java | 5 +- .../com/google/cloud/spanner/Options.java | 26 ++----- .../spanner/PartitionedDmlTransaction.java | 8 +-- .../com/google/cloud/spanner/SessionImpl.java | 4 +- .../com/google/cloud/spanner/SessionPool.java | 5 +- .../com/google/cloud/spanner/OptionsTest.java | 67 +++---------------- 8 files changed, 24 insertions(+), 101 deletions(-) diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index f7893cd83d4..5d3beb2e7b2 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -631,11 +631,4 @@ com/google/cloud/spanner/connection/Connection void setDirectedRead(com.google.spanner.v1.DirectedReadOptions) - - - 7005 - com/google/cloud/spanner/DatabaseClient - long executePartitionedUpdate(com.google.cloud.spanner.Statement, com.google.cloud.spanner.Options$UpdateOption[]) - long executePartitionedUpdate(com.google.cloud.spanner.Statement, com.google.cloud.spanner.Options$PartitionedUpdateOption[]) - diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java index 68cc604cfd8..06237131458 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java @@ -18,7 +18,6 @@ import com.google.api.gax.rpc.ServerStream; import com.google.cloud.Timestamp; -import com.google.cloud.spanner.Options.PartitionedUpdateOption; import com.google.cloud.spanner.Options.RpcPriority; import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; @@ -601,5 +600,5 @@ ServerStream batchWriteAtLeastOnce( *

Given the above, Partitioned DML is good fit for large, database-wide, operations that are * idempotent, such as deleting old rows from a very large table. */ - long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options); + long executePartitionedUpdate(Statement stmt, UpdateOption... options); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index f19d492b492..b63ad379305 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -18,8 +18,8 @@ import com.google.api.gax.rpc.ServerStream; import com.google.cloud.Timestamp; -import com.google.cloud.spanner.Options.PartitionedUpdateOption; import com.google.cloud.spanner.Options.TransactionOption; +import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionPool.PooledSessionFuture; import com.google.cloud.spanner.SpannerImpl.ClosedException; import com.google.common.annotations.VisibleForTesting; @@ -240,8 +240,7 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti } @Override - public long executePartitionedUpdate( - final Statement stmt, final PartitionedUpdateOption... options) { + public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) { ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION); try (IScope s = tracer.withSpan(span)) { return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options)); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java index ca60eea4c96..3dbd0c1cda3 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java @@ -61,6 +61,9 @@ public interface ReadOption {} public interface ReadQueryUpdateTransactionOption extends ReadOption, QueryOption, UpdateOption, TransactionOption {} + /** Marker interface to mark options applicable to Update and Write operations */ + public interface UpdateTransactionOption extends UpdateOption, TransactionOption {} + /** * Marker interface to mark options applicable to Create, Update and Delete operations in admin * API. @@ -83,15 +86,8 @@ public interface QueryOption {} /** Marker interface to mark options applicable to write operations */ public interface TransactionOption {} - /** Marker interface to mark options applicable to partitioned update */ - public interface PartitionedUpdateOption {} - /** Marker interface to mark options applicable to update operation. */ - public interface UpdateOption extends PartitionedUpdateOption {} - - /** Marker interface to mark options applicable to partitioned update and write operations */ - public interface PartitionedUpdateTransactionOption - extends PartitionedUpdateOption, TransactionOption {} + public interface UpdateOption {} /** Marker interface to mark options applicable to list operations in admin API. */ public interface ListOption {} @@ -122,7 +118,7 @@ public static TransactionOption optimisticLock() { * being recorded in the change streams with the DDL option `allow_txn_exclusion` being false or * unset. */ - public static PartitionedUpdateTransactionOption excludeTxnFromChangeStreams() { + public static UpdateTransactionOption excludeTxnFromChangeStreams() { return EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION; } @@ -301,7 +297,7 @@ void appendToOptions(Options options) { /** Option to request the transaction to be excluded from change streams. */ static final class ExcludeTxnFromChangeStreamsOption extends InternalOption - implements PartitionedUpdateTransactionOption { + implements UpdateTransactionOption { @Override void appendToOptions(Options options) { options.withExcludeTxnFromChangeStreams = true; @@ -748,16 +744,6 @@ static Options fromUpdateOptions(UpdateOption... options) { return updateOptions; } - static Options fromPartitinoedUpdateOptions(PartitionedUpdateOption... options) { - Options partitionedUpdateOptions = new Options(); - for (PartitionedUpdateOption option : options) { - if (option instanceof InternalOption) { - ((InternalOption) option).appendToOptions(partitionedUpdateOptions); - } - } - return partitionedUpdateOptions; - } - static Options fromTransactionOptions(TransactionOption... options) { Options transactionOptions = new Options(); for (TransactionOption option : options) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java index c811cedb845..d498bb232a1 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java @@ -25,7 +25,7 @@ import com.google.api.gax.rpc.InternalException; import com.google.api.gax.rpc.ServerStream; import com.google.api.gax.rpc.UnavailableException; -import com.google.cloud.spanner.Options.PartitionedUpdateOption; +import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; @@ -71,9 +71,7 @@ public class PartitionedDmlTransaction implements SessionImpl.SessionTransaction * last seen resume token if the server returns any. */ long executeStreamingPartitionedUpdate( - final Statement statement, - final Duration timeout, - final PartitionedUpdateOption... partitionedUpdateOptions) { + final Statement statement, final Duration timeout, final UpdateOption... updateOptions) { checkState(isValid, "Partitioned DML has been invalidated by a new operation on the session"); LOGGER.log(Level.FINER, "Starting PartitionedUpdate statement"); @@ -81,7 +79,7 @@ long executeStreamingPartitionedUpdate( boolean foundStats = false; long updateCount = 0L; Stopwatch stopwatch = Stopwatch.createStarted(ticker); - Options options = Options.fromPartitinoedUpdateOptions(partitionedUpdateOptions); + Options options = Options.fromUpdateOptions(updateOptions); try { ExecuteSqlRequest request = newTransactionRequestFrom(statement, options); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 215b16f5bfb..83ee2812721 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -26,8 +26,8 @@ import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction; import com.google.cloud.spanner.AbstractReadContext.SingleReadContext; import com.google.cloud.spanner.AbstractReadContext.SingleUseReadOnlyTransaction; -import com.google.cloud.spanner.Options.PartitionedUpdateOption; import com.google.cloud.spanner.Options.TransactionOption; +import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionClient.SessionId; import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; @@ -140,7 +140,7 @@ void markUsed(Instant instant) { } @Override - public long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options) { + public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { setActive(null); PartitionedDmlTransaction txn = new PartitionedDmlTransaction(this, spanner.getRpc(), Ticker.systemTicker()); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index ec58a728c04..8058802a8fc 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -52,7 +52,6 @@ import com.google.cloud.Tuple; import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; -import com.google.cloud.spanner.Options.PartitionedUpdateOption; import com.google.cloud.spanner.Options.QueryOption; import com.google.cloud.spanner.Options.ReadOption; import com.google.cloud.spanner.Options.TransactionOption; @@ -1271,7 +1270,7 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti } @Override - public long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options) { + public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { try { return get(true).executePartitionedUpdate(stmt, options); } finally { @@ -1471,7 +1470,7 @@ public ServerStream batchWriteAtLeastOnce( } @Override - public long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options) + public long executePartitionedUpdate(Statement stmt, UpdateOption... options) throws SpannerException { try { markUsed(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java index 5c223c80eb5..8c9a5d957e8 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java @@ -460,37 +460,6 @@ public void testUpdateOptionsWithPriorityHashCode() { assertNotEquals(optionsWithHighPriority1.hashCode(), optionsWithMediumPriority.hashCode()); } - @Test - public void testPartitionedUpdateOptions() { - Options option1 = Options.fromPartitinoedUpdateOptions(); - Options option2 = Options.fromPartitinoedUpdateOptions(); - assertEquals(option1, option2); - assertEquals(option1.hashCode(), option2.hashCode()); - assertEquals(option1.toString(), option2.toString()); - assertEquals("", option1.toString()); - } - - @Test - public void testPartitionedUpdateOptionsWithPriority() { - Options optionsWithHighPriority1 = - Options.fromPartitinoedUpdateOptions(Options.priority(RpcPriority.HIGH)); - assertEquals(Priority.PRIORITY_HIGH, optionsWithHighPriority1.priority()); - assertEquals("priority: HIGH ", optionsWithHighPriority1.toString()); - - Options optionsWithHighPriority2 = - Options.fromPartitinoedUpdateOptions(Options.priority(RpcPriority.HIGH)); - assertEquals(optionsWithHighPriority1, optionsWithHighPriority2); - assertEquals(optionsWithHighPriority1.hashCode(), optionsWithHighPriority2.hashCode()); - assertEquals(optionsWithHighPriority1.toString(), optionsWithHighPriority2.toString()); - - Options optionsWithMediumPriority = - Options.fromPartitinoedUpdateOptions(Options.priority(RpcPriority.MEDIUM)); - assertEquals(Priority.PRIORITY_MEDIUM, optionsWithMediumPriority.priority()); - assertEquals("priority: MEDIUM ", optionsWithMediumPriority.toString()); - assertNotEquals(optionsWithHighPriority1, optionsWithMediumPriority); - assertNotEquals(optionsWithHighPriority1.hashCode(), optionsWithMediumPriority.hashCode()); - } - @Test public void testQueryOptionsEquality() { Options option1 = Options.fromQueryOptions(); @@ -648,26 +617,6 @@ public void updateEquality() { assertThat(o2.equals(o3)).isFalse(); } - @Test - public void partitionedUpdateWithTag() { - String tag1 = "app=spanner,env=test"; - Options o1 = Options.fromPartitinoedUpdateOptions(Options.tag(tag1)); - assertEquals(tag1, o1.tag()); - assertEquals("tag: " + tag1 + " ", o1.toString()); - - Options o2 = Options.fromPartitinoedUpdateOptions(Options.tag(tag1)); - assertEquals(o1, o2); - assertEquals(o1.hashCode(), o2.hashCode()); - assertEquals(o1.toString(), o2.toString()); - - String tag2 = "app=spanner,env=stage"; - Options o3 = Options.fromPartitinoedUpdateOptions(Options.tag(tag2)); - assertEquals("tag: " + tag2 + " ", o3.toString()); - assertNotEquals(o2, o3); - assertNotEquals(o2.hashCode(), o3.hashCode()); - assertNotEquals(o2.toString(), o3.toString()); - } - @Test public void transactionOptionsTest() { String tag = "app=spanner,env=test"; @@ -757,17 +706,17 @@ public void transactionOptionsExcludeTxnFromChangeStreams() { assertNotEquals(option1.hashCode(), option3.hashCode()); assertTrue(option1.withExcludeTxnFromChangeStreams()); - assertEquals("withExcludeTxnFromChangeStreams: true ", option1.toString()); + assertThat(option1.toString()).contains("withExcludeTxnFromChangeStreams: true"); assertNull(option3.withExcludeTxnFromChangeStreams()); - assertEquals("", option3.toString()); + assertThat(option3.toString()).doesNotContain("withExcludeTxnFromChangeStreams: true"); } @Test - public void partitionedUpdateOptionsExcludeTxnFromChangeStreams() { - Options option1 = Options.fromPartitinoedUpdateOptions(Options.excludeTxnFromChangeStreams()); - Options option2 = Options.fromPartitinoedUpdateOptions(Options.excludeTxnFromChangeStreams()); - Options option3 = Options.fromPartitinoedUpdateOptions(); + public void updateOptionsExcludeTxnFromChangeStreams() { + Options option1 = Options.fromUpdateOptions(Options.excludeTxnFromChangeStreams()); + Options option2 = Options.fromUpdateOptions(Options.excludeTxnFromChangeStreams()); + Options option3 = Options.fromUpdateOptions(); assertEquals(option1, option2); assertEquals(option1.hashCode(), option2.hashCode()); @@ -775,9 +724,9 @@ public void partitionedUpdateOptionsExcludeTxnFromChangeStreams() { assertNotEquals(option1.hashCode(), option3.hashCode()); assertTrue(option1.withExcludeTxnFromChangeStreams()); - assertEquals("withExcludeTxnFromChangeStreams: true ", option1.toString()); + assertThat(option1.toString()).contains("withExcludeTxnFromChangeStreams: true"); assertNull(option3.withExcludeTxnFromChangeStreams()); - assertEquals("", option3.toString()); + assertThat(option3.toString()).doesNotContain("withExcludeTxnFromChangeStreams: true"); } } From dea6bf5f65190cde54a8507293800803ef49bcbc Mon Sep 17 00:00:00 2001 From: Wei Deng Date: Tue, 26 Mar 2024 15:32:05 -0700 Subject: [PATCH 5/6] Add error handling in DML update APIs where excludeTxnFromChangeStreams option is not applicable --- .../cloud/spanner/TransactionRunnerImpl.java | 46 ++++-- .../cloud/spanner/DatabaseClientImplTest.java | 138 +++++++++++++++++- 2 files changed, 168 insertions(+), 16 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 3551338b6d3..370d2f662f5 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -76,6 +76,10 @@ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner { private static final String TRANSACTION_ALREADY_COMMITTED_MESSAGE = "Transaction has already committed"; + private static final String DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE = + "Options.excludeTxnFromChangeStreams() cannot be specified for individual DML requests. " + + "This option should be set at the transaction level."; + @VisibleForTesting static class TransactionContextImpl extends AbstractReadContext implements TransactionContext { @@ -727,14 +731,16 @@ public long executeUpdate(Statement statement, UpdateOption... options) { } private ResultSet internalExecuteUpdate( - Statement statement, QueryMode queryMode, UpdateOption... options) { + Statement statement, QueryMode queryMode, UpdateOption... updateOptions) { beforeReadOrQuery(); + final Options options = Options.fromUpdateOptions(updateOptions); + if (options.withExcludeTxnFromChangeStreams() != null) { + throw newSpannerException( + ErrorCode.INVALID_ARGUMENT, DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE); + } final ExecuteSqlRequest.Builder builder = getExecuteSqlRequestBuilder( - statement, - queryMode, - Options.fromUpdateOptions(options), - /* withTransactionSelector = */ true); + statement, queryMode, options, /* withTransactionSelector = */ true); try { com.google.spanner.v1.ResultSet resultSet = rpc.executeQuery(builder.build(), session.getOptions(), isRouteToLeader()); @@ -755,14 +761,16 @@ private ResultSet internalExecuteUpdate( } @Override - public ApiFuture executeUpdateAsync(Statement statement, UpdateOption... options) { + public ApiFuture executeUpdateAsync(Statement statement, UpdateOption... updateOptions) { beforeReadOrQuery(); + final Options options = Options.fromUpdateOptions(updateOptions); + if (options.withExcludeTxnFromChangeStreams() != null) { + throw newSpannerException( + ErrorCode.INVALID_ARGUMENT, DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE); + } final ExecuteSqlRequest.Builder builder = getExecuteSqlRequestBuilder( - statement, - QueryMode.NORMAL, - Options.fromUpdateOptions(options), - /* withTransactionSelector = */ true); + statement, QueryMode.NORMAL, options, /* withTransactionSelector = */ true); final ApiFuture resultSet; try { // Register the update as an async operation that must finish before the transaction may @@ -834,10 +842,15 @@ private SpannerException createAbortedExceptionForBatchDml(ExecuteBatchDmlRespon } @Override - public long[] batchUpdate(Iterable statements, UpdateOption... options) { + public long[] batchUpdate(Iterable statements, UpdateOption... updateOptions) { beforeReadOrQuery(); + final Options options = Options.fromUpdateOptions(updateOptions); + if (options.withExcludeTxnFromChangeStreams() != null) { + throw newSpannerException( + ErrorCode.INVALID_ARGUMENT, DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE); + } final ExecuteBatchDmlRequest.Builder builder = - getExecuteBatchDmlRequestBuilder(statements, Options.fromUpdateOptions(options)); + getExecuteBatchDmlRequestBuilder(statements, options); try { com.google.spanner.v1.ExecuteBatchDmlResponse response = rpc.executeBatchDml(builder.build(), session.getOptions()); @@ -871,10 +884,15 @@ public long[] batchUpdate(Iterable statements, UpdateOption... option @Override public ApiFuture batchUpdateAsync( - Iterable statements, UpdateOption... options) { + Iterable statements, UpdateOption... updateOptions) { beforeReadOrQuery(); + final Options options = Options.fromUpdateOptions(updateOptions); + if (options.withExcludeTxnFromChangeStreams() != null) { + throw newSpannerException( + ErrorCode.INVALID_ARGUMENT, DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE); + } final ExecuteBatchDmlRequest.Builder builder = - getExecuteBatchDmlRequestBuilder(statements, Options.fromUpdateOptions(options)); + getExecuteBatchDmlRequestBuilder(statements, options); ApiFuture response; try { // Register the update as an async operation that must finish before the transaction may diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 4feac25e3ed..4ab6829a327 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -2041,7 +2041,7 @@ public void testAsyncTransactionManagerCommitWithTag() { } @Test - public void testExecuteUpdateWithExcludeTxnFromChangeStreams() { + public void testReadWriteTxnWithExcludeTxnFromChangeStreams_executeUpdate() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); TransactionRunner runner = client.readWriteTransaction(Options.excludeTxnFromChangeStreams()); @@ -2056,7 +2056,7 @@ public void testExecuteUpdateWithExcludeTxnFromChangeStreams() { } @Test - public void testBatchUpdateWithExcludeTxnFromChangeStreams() { + public void testReadWriteTxnWithExcludeTxnFromChangeStreams_batchUpdate() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); TransactionRunner runner = client.readWriteTransaction(Options.excludeTxnFromChangeStreams()); @@ -2175,6 +2175,140 @@ public void testAsyncTransactionManagerCommitWithExcludeTxnFromChangeStreams() { assertTrue(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); } + @Test + public void testExecuteUpdateWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionRunner runner = client.readWriteTransaction(); + SpannerException e = + assertThrows( + SpannerException.class, + () -> + runner.run( + transaction -> + transaction.executeUpdate( + UPDATE_STATEMENT, Options.excludeTxnFromChangeStreams()))); + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); + assertThat(e.getMessage()) + .contains( + "Options.excludeTxnFromChangeStreams() cannot be specified for individual DML requests." + + " This option should be set at the transaction level."); + } + + @Test + public void testExecuteUpdateAsyncWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + AsyncRunner runner = client.runAsync(); + SpannerException e = + assertThrows( + SpannerException.class, + () -> + get( + runner.runAsync( + txn -> { + txn.executeUpdateAsync( + UPDATE_STATEMENT, Options.excludeTxnFromChangeStreams()); + return ApiFutures.immediateFuture(null); + }, + executor))); + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); + assertThat(e.getMessage()) + .contains( + "Options.excludeTxnFromChangeStreams() cannot be specified for individual DML requests." + + " This option should be set at the transaction level."); + } + + @Test + public void testAnalyzeUpdateWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionRunner runner = client.readWriteTransaction(); + SpannerException e = + assertThrows( + SpannerException.class, + () -> + runner.run( + transaction -> + transaction.analyzeUpdate( + UPDATE_STATEMENT, + QueryAnalyzeMode.PROFILE, + Options.excludeTxnFromChangeStreams()))); + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); + assertThat(e.getMessage()) + .contains( + "Options.excludeTxnFromChangeStreams() cannot be specified for individual DML requests." + + " This option should be set at the transaction level."); + } + + @Test + public void testAnalyzeUpdateStatementWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionRunner runner = client.readWriteTransaction(); + SpannerException e = + assertThrows( + SpannerException.class, + () -> + runner.run( + transaction -> + transaction.analyzeUpdateStatement( + UPDATE_STATEMENT, + QueryAnalyzeMode.PROFILE, + Options.excludeTxnFromChangeStreams()))); + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); + assertThat(e.getMessage()) + .contains( + "Options.excludeTxnFromChangeStreams() cannot be specified for individual DML requests." + + " This option should be set at the transaction level."); + } + + @Test + public void testBatchUpdateWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionRunner runner = client.readWriteTransaction(); + SpannerException e = + assertThrows( + SpannerException.class, + () -> + runner.run( + transaction -> + transaction.batchUpdate( + Collections.singletonList(UPDATE_STATEMENT), + Options.excludeTxnFromChangeStreams()))); + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); + assertThat(e.getMessage()) + .contains( + "Options.excludeTxnFromChangeStreams() cannot be specified for individual DML requests." + + " This option should be set at the transaction level."); + } + + @Test + public void testBatchUpdateAsyncWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + AsyncRunner runner = client.runAsync(); + SpannerException e = + assertThrows( + SpannerException.class, + () -> + get( + runner.runAsync( + txn -> { + txn.batchUpdateAsync( + Collections.singletonList(UPDATE_STATEMENT), + Options.excludeTxnFromChangeStreams()); + return ApiFutures.immediateFuture(null); + }, + executor))); + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); + assertThat(e.getMessage()) + .contains( + "Options.excludeTxnFromChangeStreams() cannot be specified for individual DML requests." + + " This option should be set at the transaction level."); + } + @Test public void singleUse() { DatabaseClientImpl client = From 39c79ac86bd2f4d10db80f1b25b278f3c76395c8 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Thu, 28 Mar 2024 17:11:45 +0000 Subject: [PATCH 6/6] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 4718890c54f..b8fd2f3be7c 100644 --- a/README.md +++ b/README.md @@ -57,13 +57,13 @@ implementation 'com.google.cloud:google-cloud-spanner' If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-spanner:6.61.0' +implementation 'com.google.cloud:google-cloud-spanner:6.62.0' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.61.0" +libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.62.0" ``` @@ -650,7 +650,7 @@ Java is a registered trademark of Oracle and/or its affiliates. [kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-spanner/java11.html [stability-image]: https://img.shields.io/badge/stability-stable-green [maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-spanner.svg -[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.61.0 +[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.62.0 [authentication]: https://github.com/googleapis/google-cloud-java#authentication [auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes [predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles