@@ -76,6 +76,10 @@ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner {
76
76
private static final String TRANSACTION_ALREADY_COMMITTED_MESSAGE =
77
77
"Transaction has already committed" ;
78
78
79
+ private static final String DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE =
80
+ "Options.excludeTxnFromChangeStreams() cannot be specified for individual DML requests. "
81
+ + "This option should be set at the transaction level." ;
82
+
79
83
@ VisibleForTesting
80
84
static class TransactionContextImpl extends AbstractReadContext implements TransactionContext {
81
85
@@ -371,7 +375,9 @@ public void run() {
371
375
if (transactionId == null && transactionIdFuture == null ) {
372
376
requestBuilder .setSingleUseTransaction (
373
377
TransactionOptions .newBuilder ()
374
- .setReadWrite (TransactionOptions .ReadWrite .getDefaultInstance ()));
378
+ .setReadWrite (TransactionOptions .ReadWrite .getDefaultInstance ())
379
+ .setExcludeTxnFromChangeStreams (
380
+ options .withExcludeTxnFromChangeStreams () == Boolean .TRUE ));
375
381
} else {
376
382
requestBuilder .setTransactionId (
377
383
transactionId == null
@@ -725,14 +731,16 @@ public long executeUpdate(Statement statement, UpdateOption... options) {
725
731
}
726
732
727
733
private ResultSet internalExecuteUpdate (
728
- Statement statement , QueryMode queryMode , UpdateOption ... options ) {
734
+ Statement statement , QueryMode queryMode , UpdateOption ... updateOptions ) {
729
735
beforeReadOrQuery ();
736
+ final Options options = Options .fromUpdateOptions (updateOptions );
737
+ if (options .withExcludeTxnFromChangeStreams () != null ) {
738
+ throw newSpannerException (
739
+ ErrorCode .INVALID_ARGUMENT , DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE );
740
+ }
730
741
final ExecuteSqlRequest .Builder builder =
731
742
getExecuteSqlRequestBuilder (
732
- statement ,
733
- queryMode ,
734
- Options .fromUpdateOptions (options ),
735
- /* withTransactionSelector = */ true );
743
+ statement , queryMode , options , /* withTransactionSelector = */ true );
736
744
try {
737
745
com .google .spanner .v1 .ResultSet resultSet =
738
746
rpc .executeQuery (builder .build (), session .getOptions (), isRouteToLeader ());
@@ -753,14 +761,16 @@ private ResultSet internalExecuteUpdate(
753
761
}
754
762
755
763
@ Override
756
- public ApiFuture <Long > executeUpdateAsync (Statement statement , UpdateOption ... options ) {
764
+ public ApiFuture <Long > executeUpdateAsync (Statement statement , UpdateOption ... updateOptions ) {
757
765
beforeReadOrQuery ();
766
+ final Options options = Options .fromUpdateOptions (updateOptions );
767
+ if (options .withExcludeTxnFromChangeStreams () != null ) {
768
+ throw newSpannerException (
769
+ ErrorCode .INVALID_ARGUMENT , DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE );
770
+ }
758
771
final ExecuteSqlRequest .Builder builder =
759
772
getExecuteSqlRequestBuilder (
760
- statement ,
761
- QueryMode .NORMAL ,
762
- Options .fromUpdateOptions (options ),
763
- /* withTransactionSelector = */ true );
773
+ statement , QueryMode .NORMAL , options , /* withTransactionSelector = */ true );
764
774
final ApiFuture <com .google .spanner .v1 .ResultSet > resultSet ;
765
775
try {
766
776
// Register the update as an async operation that must finish before the transaction may
@@ -832,10 +842,15 @@ private SpannerException createAbortedExceptionForBatchDml(ExecuteBatchDmlRespon
832
842
}
833
843
834
844
@ Override
835
- public long [] batchUpdate (Iterable <Statement > statements , UpdateOption ... options ) {
845
+ public long [] batchUpdate (Iterable <Statement > statements , UpdateOption ... updateOptions ) {
836
846
beforeReadOrQuery ();
847
+ final Options options = Options .fromUpdateOptions (updateOptions );
848
+ if (options .withExcludeTxnFromChangeStreams () != null ) {
849
+ throw newSpannerException (
850
+ ErrorCode .INVALID_ARGUMENT , DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE );
851
+ }
837
852
final ExecuteBatchDmlRequest .Builder builder =
838
- getExecuteBatchDmlRequestBuilder (statements , Options . fromUpdateOptions ( options ) );
853
+ getExecuteBatchDmlRequestBuilder (statements , options );
839
854
try {
840
855
com .google .spanner .v1 .ExecuteBatchDmlResponse response =
841
856
rpc .executeBatchDml (builder .build (), session .getOptions ());
@@ -869,10 +884,15 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... option
869
884
870
885
@ Override
871
886
public ApiFuture <long []> batchUpdateAsync (
872
- Iterable <Statement > statements , UpdateOption ... options ) {
887
+ Iterable <Statement > statements , UpdateOption ... updateOptions ) {
873
888
beforeReadOrQuery ();
889
+ final Options options = Options .fromUpdateOptions (updateOptions );
890
+ if (options .withExcludeTxnFromChangeStreams () != null ) {
891
+ throw newSpannerException (
892
+ ErrorCode .INVALID_ARGUMENT , DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE );
893
+ }
874
894
final ExecuteBatchDmlRequest .Builder builder =
875
- getExecuteBatchDmlRequestBuilder (statements , Options . fromUpdateOptions ( options ) );
895
+ getExecuteBatchDmlRequestBuilder (statements , options );
876
896
ApiFuture <com .google .spanner .v1 .ExecuteBatchDmlResponse > response ;
877
897
try {
878
898
// Register the update as an async operation that must finish before the transaction may
0 commit comments