Skip to content

Commit 7ae376a

Browse files
feat: add support for transaction-level exclusion from change streams (#2959)
* Add support for transaction-level exclusion from change streams * cleanup * refactor: introduce PartitionedUpdateOption * Revert "refactor: introduce PartitionedUpdateOption" This reverts commit 96b508b. * Add error handling in DML update APIs where excludeTxnFromChangeStreams option is not applicable * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 58ef1fe commit 7ae376a

File tree

6 files changed

+534
-23
lines changed

6 files changed

+534
-23
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java

+40
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ public interface ReadOption {}
6161
public interface ReadQueryUpdateTransactionOption
6262
extends ReadOption, QueryOption, UpdateOption, TransactionOption {}
6363

64+
/** Marker interface to mark options applicable to Update and Write operations */
65+
public interface UpdateTransactionOption extends UpdateOption, TransactionOption {}
66+
6467
/**
6568
* Marker interface to mark options applicable to Create, Update and Delete operations in admin
6669
* API.
@@ -108,6 +111,17 @@ public static TransactionOption commitStats() {
108111
public static TransactionOption optimisticLock() {
109112
return OPTIMISTIC_LOCK_OPTION;
110113
}
114+
115+
/**
116+
* Specifying this instructs the transaction to be excluded from being recorded in change streams
117+
* with the DDL option `allow_txn_exclusion=true`. This does not exclude the transaction from
118+
* being recorded in the change streams with the DDL option `allow_txn_exclusion` being false or
119+
* unset.
120+
*/
121+
public static UpdateTransactionOption excludeTxnFromChangeStreams() {
122+
return EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION;
123+
}
124+
111125
/**
112126
* Specifying this will cause the read to yield at most this many rows. This should be greater
113127
* than 0.
@@ -281,6 +295,18 @@ void appendToOptions(Options options) {
281295

282296
static final OptimisticLockOption OPTIMISTIC_LOCK_OPTION = new OptimisticLockOption();
283297

298+
/** Option to request the transaction to be excluded from change streams. */
299+
static final class ExcludeTxnFromChangeStreamsOption extends InternalOption
300+
implements UpdateTransactionOption {
301+
@Override
302+
void appendToOptions(Options options) {
303+
options.withExcludeTxnFromChangeStreams = true;
304+
}
305+
}
306+
307+
static final ExcludeTxnFromChangeStreamsOption EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION =
308+
new ExcludeTxnFromChangeStreamsOption();
309+
284310
/** Option pertaining to flow control. */
285311
static final class FlowControlOption extends InternalOption implements ReadAndQueryOption {
286312
final int prefetchChunks;
@@ -405,6 +431,7 @@ void appendToOptions(Options options) {
405431
private String etag;
406432
private Boolean validateOnly;
407433
private Boolean withOptimisticLock;
434+
private Boolean withExcludeTxnFromChangeStreams;
408435
private Boolean dataBoostEnabled;
409436
private DirectedReadOptions directedReadOptions;
410437
private DecodeMode decodeMode;
@@ -508,6 +535,10 @@ Boolean withOptimisticLock() {
508535
return withOptimisticLock;
509536
}
510537

538+
Boolean withExcludeTxnFromChangeStreams() {
539+
return withExcludeTxnFromChangeStreams;
540+
}
541+
511542
boolean hasDataBoostEnabled() {
512543
return dataBoostEnabled != null;
513544
}
@@ -571,6 +602,11 @@ public String toString() {
571602
if (withOptimisticLock != null) {
572603
b.append("withOptimisticLock: ").append(withOptimisticLock).append(' ');
573604
}
605+
if (withExcludeTxnFromChangeStreams != null) {
606+
b.append("withExcludeTxnFromChangeStreams: ")
607+
.append(withExcludeTxnFromChangeStreams)
608+
.append(' ');
609+
}
574610
if (dataBoostEnabled != null) {
575611
b.append("dataBoostEnabled: ").append(dataBoostEnabled).append(' ');
576612
}
@@ -616,6 +652,7 @@ public boolean equals(Object o) {
616652
&& Objects.equals(etag(), that.etag())
617653
&& Objects.equals(validateOnly(), that.validateOnly())
618654
&& Objects.equals(withOptimisticLock(), that.withOptimisticLock())
655+
&& Objects.equals(withExcludeTxnFromChangeStreams(), that.withExcludeTxnFromChangeStreams())
619656
&& Objects.equals(dataBoostEnabled(), that.dataBoostEnabled())
620657
&& Objects.equals(directedReadOptions(), that.directedReadOptions());
621658
}
@@ -662,6 +699,9 @@ public int hashCode() {
662699
if (withOptimisticLock != null) {
663700
result = 31 * result + withOptimisticLock.hashCode();
664701
}
702+
if (withExcludeTxnFromChangeStreams != null) {
703+
result = 31 * result + withExcludeTxnFromChangeStreams.hashCode();
704+
}
665705
if (dataBoostEnabled != null) {
666706
result = 31 * result + dataBoostEnabled.hashCode();
667707
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ private ExecuteSqlRequest resumeOrRestartRequest(
167167

168168
@VisibleForTesting
169169
ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Options options) {
170-
ByteString transactionId = initTransaction();
170+
ByteString transactionId = initTransaction(options);
171171

172172
final TransactionSelector transactionSelector =
173173
TransactionSelector.newBuilder().setId(transactionId).build();
@@ -195,13 +195,15 @@ ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Opt
195195
return builder.build();
196196
}
197197

198-
private ByteString initTransaction() {
198+
private ByteString initTransaction(final Options options) {
199199
final BeginTransactionRequest request =
200200
BeginTransactionRequest.newBuilder()
201201
.setSession(session.getName())
202202
.setOptions(
203203
TransactionOptions.newBuilder()
204-
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance()))
204+
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance())
205+
.setExcludeTxnFromChangeStreams(
206+
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE))
205207
.build();
206208
Transaction tx = rpc.beginTransaction(request, session.getOptions(), true);
207209
if (tx.getId().isEmpty()) {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

+20-5
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,16 @@ static void throwIfTransactionsPending() {
6969
}
7070

7171
static TransactionOptions createReadWriteTransactionOptions(Options options) {
72+
TransactionOptions.Builder transactionOptions = TransactionOptions.newBuilder();
73+
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
74+
transactionOptions.setExcludeTxnFromChangeStreams(true);
75+
}
7276
TransactionOptions.ReadWrite.Builder readWrite = TransactionOptions.ReadWrite.newBuilder();
7377
if (options.withOptimisticLock() == Boolean.TRUE) {
7478
readWrite.setReadLockMode(TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC);
7579
}
76-
return TransactionOptions.newBuilder().setReadWrite(readWrite).build();
80+
transactionOptions.setReadWrite(readWrite);
81+
return transactionOptions.build();
7782
}
7883

7984
/**
@@ -209,10 +214,16 @@ public CommitResponse writeAtLeastOnceWithOptions(
209214
CommitRequest.newBuilder()
210215
.setSession(name)
211216
.setReturnCommitStats(options.withCommitStats())
212-
.addAllMutations(mutationsProto)
213-
.setSingleUseTransaction(
214-
TransactionOptions.newBuilder()
215-
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
217+
.addAllMutations(mutationsProto);
218+
219+
TransactionOptions.Builder transactionOptionsBuilder =
220+
TransactionOptions.newBuilder()
221+
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance());
222+
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
223+
transactionOptionsBuilder.setExcludeTxnFromChangeStreams(true);
224+
}
225+
requestBuilder.setSingleUseTransaction(transactionOptionsBuilder);
226+
216227
if (options.hasMaxCommitDelay()) {
217228
requestBuilder.setMaxCommitDelay(
218229
Duration.newBuilder()
@@ -266,6 +277,10 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
266277
if (batchWriteRequestOptions != null) {
267278
requestBuilder.setRequestOptions(batchWriteRequestOptions);
268279
}
280+
if (Options.fromTransactionOptions(transactionOptions).withExcludeTxnFromChangeStreams()
281+
== Boolean.TRUE) {
282+
requestBuilder.setExcludeTxnFromChangeStreams(true);
283+
}
269284
ISpan span = tracer.spanBuilder(SpannerImpl.BATCH_WRITE);
270285
try (IScope s = tracer.withSpan(span)) {
271286
return spanner.getRpc().batchWriteAtLeastOnce(requestBuilder.build(), this.options);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

+35-15
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner {
7676
private static final String TRANSACTION_ALREADY_COMMITTED_MESSAGE =
7777
"Transaction has already committed";
7878

79+
private static final String DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE =
80+
"Options.excludeTxnFromChangeStreams() cannot be specified for individual DML requests. "
81+
+ "This option should be set at the transaction level.";
82+
7983
@VisibleForTesting
8084
static class TransactionContextImpl extends AbstractReadContext implements TransactionContext {
8185

@@ -371,7 +375,9 @@ public void run() {
371375
if (transactionId == null && transactionIdFuture == null) {
372376
requestBuilder.setSingleUseTransaction(
373377
TransactionOptions.newBuilder()
374-
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
378+
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())
379+
.setExcludeTxnFromChangeStreams(
380+
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE));
375381
} else {
376382
requestBuilder.setTransactionId(
377383
transactionId == null
@@ -725,14 +731,16 @@ public long executeUpdate(Statement statement, UpdateOption... options) {
725731
}
726732

727733
private ResultSet internalExecuteUpdate(
728-
Statement statement, QueryMode queryMode, UpdateOption... options) {
734+
Statement statement, QueryMode queryMode, UpdateOption... updateOptions) {
729735
beforeReadOrQuery();
736+
final Options options = Options.fromUpdateOptions(updateOptions);
737+
if (options.withExcludeTxnFromChangeStreams() != null) {
738+
throw newSpannerException(
739+
ErrorCode.INVALID_ARGUMENT, DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE);
740+
}
730741
final ExecuteSqlRequest.Builder builder =
731742
getExecuteSqlRequestBuilder(
732-
statement,
733-
queryMode,
734-
Options.fromUpdateOptions(options),
735-
/* withTransactionSelector = */ true);
743+
statement, queryMode, options, /* withTransactionSelector = */ true);
736744
try {
737745
com.google.spanner.v1.ResultSet resultSet =
738746
rpc.executeQuery(builder.build(), session.getOptions(), isRouteToLeader());
@@ -753,14 +761,16 @@ private ResultSet internalExecuteUpdate(
753761
}
754762

755763
@Override
756-
public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... options) {
764+
public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... updateOptions) {
757765
beforeReadOrQuery();
766+
final Options options = Options.fromUpdateOptions(updateOptions);
767+
if (options.withExcludeTxnFromChangeStreams() != null) {
768+
throw newSpannerException(
769+
ErrorCode.INVALID_ARGUMENT, DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE);
770+
}
758771
final ExecuteSqlRequest.Builder builder =
759772
getExecuteSqlRequestBuilder(
760-
statement,
761-
QueryMode.NORMAL,
762-
Options.fromUpdateOptions(options),
763-
/* withTransactionSelector = */ true);
773+
statement, QueryMode.NORMAL, options, /* withTransactionSelector = */ true);
764774
final ApiFuture<com.google.spanner.v1.ResultSet> resultSet;
765775
try {
766776
// Register the update as an async operation that must finish before the transaction may
@@ -832,10 +842,15 @@ private SpannerException createAbortedExceptionForBatchDml(ExecuteBatchDmlRespon
832842
}
833843

834844
@Override
835-
public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... options) {
845+
public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... updateOptions) {
836846
beforeReadOrQuery();
847+
final Options options = Options.fromUpdateOptions(updateOptions);
848+
if (options.withExcludeTxnFromChangeStreams() != null) {
849+
throw newSpannerException(
850+
ErrorCode.INVALID_ARGUMENT, DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE);
851+
}
837852
final ExecuteBatchDmlRequest.Builder builder =
838-
getExecuteBatchDmlRequestBuilder(statements, Options.fromUpdateOptions(options));
853+
getExecuteBatchDmlRequestBuilder(statements, options);
839854
try {
840855
com.google.spanner.v1.ExecuteBatchDmlResponse response =
841856
rpc.executeBatchDml(builder.build(), session.getOptions());
@@ -869,10 +884,15 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... option
869884

870885
@Override
871886
public ApiFuture<long[]> batchUpdateAsync(
872-
Iterable<Statement> statements, UpdateOption... options) {
887+
Iterable<Statement> statements, UpdateOption... updateOptions) {
873888
beforeReadOrQuery();
889+
final Options options = Options.fromUpdateOptions(updateOptions);
890+
if (options.withExcludeTxnFromChangeStreams() != null) {
891+
throw newSpannerException(
892+
ErrorCode.INVALID_ARGUMENT, DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE);
893+
}
874894
final ExecuteBatchDmlRequest.Builder builder =
875-
getExecuteBatchDmlRequestBuilder(statements, Options.fromUpdateOptions(options));
895+
getExecuteBatchDmlRequestBuilder(statements, options);
876896
ApiFuture<com.google.spanner.v1.ExecuteBatchDmlResponse> response;
877897
try {
878898
// Register the update as an async operation that must finish before the transaction may

0 commit comments

Comments
 (0)