Skip to content

Commit 96b508b

Browse files
committed
refactor: introduce PartitionedUpdateOption
1 parent aa7bc0c commit 96b508b

File tree

8 files changed

+101
-24
lines changed

8 files changed

+101
-24
lines changed

google-cloud-spanner/clirr-ignored-differences.xml

+7
Original file line numberDiff line numberDiff line change
@@ -631,4 +631,11 @@
631631
<className>com/google/cloud/spanner/connection/Connection</className>
632632
<method>void setDirectedRead(com.google.spanner.v1.DirectedReadOptions)</method>
633633
</difference>
634+
635+
<difference>
636+
<differenceType>7005</differenceType>
637+
<className>com/google/cloud/spanner/DatabaseClient</className>
638+
<method>long executePartitionedUpdate(com.google.cloud.spanner.Statement, com.google.cloud.spanner.Options$UpdateOption[])</method>
639+
<to>long executePartitionedUpdate(com.google.cloud.spanner.Statement, com.google.cloud.spanner.Options$PartitionedUpdateOption[])</to>
640+
</difference>
634641
</differences>

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.api.gax.rpc.ServerStream;
2020
import com.google.cloud.Timestamp;
21+
import com.google.cloud.spanner.Options.PartitionedUpdateOption;
2122
import com.google.cloud.spanner.Options.RpcPriority;
2223
import com.google.cloud.spanner.Options.TransactionOption;
2324
import com.google.cloud.spanner.Options.UpdateOption;
@@ -600,5 +601,5 @@ ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
600601
* <p>Given the above, Partitioned DML is good fit for large, database-wide, operations that are
601602
* idempotent, such as deleting old rows from a very large table.
602603
*/
603-
long executePartitionedUpdate(Statement stmt, UpdateOption... options);
604+
long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options);
604605
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818

1919
import com.google.api.gax.rpc.ServerStream;
2020
import com.google.cloud.Timestamp;
21+
import com.google.cloud.spanner.Options.PartitionedUpdateOption;
2122
import com.google.cloud.spanner.Options.TransactionOption;
22-
import com.google.cloud.spanner.Options.UpdateOption;
2323
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
2424
import com.google.cloud.spanner.SpannerImpl.ClosedException;
2525
import com.google.common.annotations.VisibleForTesting;
@@ -240,7 +240,8 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti
240240
}
241241

242242
@Override
243-
public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) {
243+
public long executePartitionedUpdate(
244+
final Statement stmt, final PartitionedUpdateOption... options) {
244245
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION);
245246
try (IScope s = tracer.withSpan(span)) {
246247
return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options));

google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java

+20-6
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,6 @@ public interface ReadOption {}
6161
public interface ReadQueryUpdateTransactionOption
6262
extends ReadOption, QueryOption, UpdateOption, TransactionOption {}
6363

64-
/** Marker interface to mark options applicable to Update and Write operations */
65-
public interface UpdateTransactionOption extends UpdateOption, TransactionOption {}
66-
6764
/**
6865
* Marker interface to mark options applicable to Create, Update and Delete operations in admin
6966
* API.
@@ -86,8 +83,15 @@ public interface QueryOption {}
8683
/** Marker interface to mark options applicable to write operations */
8784
public interface TransactionOption {}
8885

86+
/** Marker interface to mark options applicable to partitioned update */
87+
public interface PartitionedUpdateOption {}
88+
8989
/** Marker interface to mark options applicable to update operation. */
90-
public interface UpdateOption {}
90+
public interface UpdateOption extends PartitionedUpdateOption {}
91+
92+
/** Marker interface to mark options applicable to partitioned update and write operations */
93+
public interface PartitionedUpdateTransactionOption
94+
extends PartitionedUpdateOption, TransactionOption {}
9195

9296
/** Marker interface to mark options applicable to list operations in admin API. */
9397
public interface ListOption {}
@@ -118,7 +122,7 @@ public static TransactionOption optimisticLock() {
118122
* being recorded in the change streams with the DDL option `allow_txn_exclusion` being false or
119123
* unset.
120124
*/
121-
public static UpdateTransactionOption excludeTxnFromChangeStreams() {
125+
public static PartitionedUpdateTransactionOption excludeTxnFromChangeStreams() {
122126
return EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION;
123127
}
124128

@@ -297,7 +301,7 @@ void appendToOptions(Options options) {
297301

298302
/** Option to request the transaction to be excluded from change streams. */
299303
static final class ExcludeTxnFromChangeStreamsOption extends InternalOption
300-
implements UpdateTransactionOption {
304+
implements PartitionedUpdateTransactionOption {
301305
@Override
302306
void appendToOptions(Options options) {
303307
options.withExcludeTxnFromChangeStreams = true;
@@ -744,6 +748,16 @@ static Options fromUpdateOptions(UpdateOption... options) {
744748
return updateOptions;
745749
}
746750

751+
static Options fromPartitinoedUpdateOptions(PartitionedUpdateOption... options) {
752+
Options partitionedUpdateOptions = new Options();
753+
for (PartitionedUpdateOption option : options) {
754+
if (option instanceof InternalOption) {
755+
((InternalOption) option).appendToOptions(partitionedUpdateOptions);
756+
}
757+
}
758+
return partitionedUpdateOptions;
759+
}
760+
747761
static Options fromTransactionOptions(TransactionOption... options) {
748762
Options transactionOptions = new Options();
749763
for (TransactionOption option : options) {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import com.google.api.gax.rpc.InternalException;
2626
import com.google.api.gax.rpc.ServerStream;
2727
import com.google.api.gax.rpc.UnavailableException;
28-
import com.google.cloud.spanner.Options.UpdateOption;
28+
import com.google.cloud.spanner.Options.PartitionedUpdateOption;
2929
import com.google.cloud.spanner.spi.v1.SpannerRpc;
3030
import com.google.common.annotations.VisibleForTesting;
3131
import com.google.common.base.Stopwatch;
@@ -71,15 +71,17 @@ public class PartitionedDmlTransaction implements SessionImpl.SessionTransaction
7171
* last seen resume token if the server returns any.
7272
*/
7373
long executeStreamingPartitionedUpdate(
74-
final Statement statement, final Duration timeout, final UpdateOption... updateOptions) {
74+
final Statement statement,
75+
final Duration timeout,
76+
final PartitionedUpdateOption... partitionedUpdateOptions) {
7577
checkState(isValid, "Partitioned DML has been invalidated by a new operation on the session");
7678
LOGGER.log(Level.FINER, "Starting PartitionedUpdate statement");
7779

7880
ByteString resumeToken = ByteString.EMPTY;
7981
boolean foundStats = false;
8082
long updateCount = 0L;
8183
Stopwatch stopwatch = Stopwatch.createStarted(ticker);
82-
Options options = Options.fromUpdateOptions(updateOptions);
84+
Options options = Options.fromPartitinoedUpdateOptions(partitionedUpdateOptions);
8385

8486
try {
8587
ExecuteSqlRequest request = newTransactionRequestFrom(statement, options);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction;
2727
import com.google.cloud.spanner.AbstractReadContext.SingleReadContext;
2828
import com.google.cloud.spanner.AbstractReadContext.SingleUseReadOnlyTransaction;
29+
import com.google.cloud.spanner.Options.PartitionedUpdateOption;
2930
import com.google.cloud.spanner.Options.TransactionOption;
30-
import com.google.cloud.spanner.Options.UpdateOption;
3131
import com.google.cloud.spanner.SessionClient.SessionId;
3232
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
3333
import com.google.cloud.spanner.spi.v1.SpannerRpc;
@@ -140,7 +140,7 @@ void markUsed(Instant instant) {
140140
}
141141

142142
@Override
143-
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
143+
public long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options) {
144144
setActive(null);
145145
PartitionedDmlTransaction txn =
146146
new PartitionedDmlTransaction(this, spanner.getRpc(), Ticker.systemTicker());

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import com.google.cloud.Tuple;
5353
import com.google.cloud.grpc.GrpcTransportOptions;
5454
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
55+
import com.google.cloud.spanner.Options.PartitionedUpdateOption;
5556
import com.google.cloud.spanner.Options.QueryOption;
5657
import com.google.cloud.spanner.Options.ReadOption;
5758
import com.google.cloud.spanner.Options.TransactionOption;
@@ -1270,7 +1271,7 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti
12701271
}
12711272

12721273
@Override
1273-
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
1274+
public long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options) {
12741275
try {
12751276
return get(true).executePartitionedUpdate(stmt, options);
12761277
} finally {
@@ -1470,7 +1471,7 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
14701471
}
14711472

14721473
@Override
1473-
public long executePartitionedUpdate(Statement stmt, UpdateOption... options)
1474+
public long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options)
14741475
throws SpannerException {
14751476
try {
14761477
markUsed();

google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java

+59-8
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,37 @@ public void testUpdateOptionsWithPriorityHashCode() {
460460
assertNotEquals(optionsWithHighPriority1.hashCode(), optionsWithMediumPriority.hashCode());
461461
}
462462

463+
@Test
464+
public void testPartitionedUpdateOptions() {
465+
Options option1 = Options.fromPartitinoedUpdateOptions();
466+
Options option2 = Options.fromPartitinoedUpdateOptions();
467+
assertEquals(option1, option2);
468+
assertEquals(option1.hashCode(), option2.hashCode());
469+
assertEquals(option1.toString(), option2.toString());
470+
assertEquals("", option1.toString());
471+
}
472+
473+
@Test
474+
public void testPartitionedUpdateOptionsWithPriority() {
475+
Options optionsWithHighPriority1 =
476+
Options.fromPartitinoedUpdateOptions(Options.priority(RpcPriority.HIGH));
477+
assertEquals(Priority.PRIORITY_HIGH, optionsWithHighPriority1.priority());
478+
assertEquals("priority: HIGH ", optionsWithHighPriority1.toString());
479+
480+
Options optionsWithHighPriority2 =
481+
Options.fromPartitinoedUpdateOptions(Options.priority(RpcPriority.HIGH));
482+
assertEquals(optionsWithHighPriority1, optionsWithHighPriority2);
483+
assertEquals(optionsWithHighPriority1.hashCode(), optionsWithHighPriority2.hashCode());
484+
assertEquals(optionsWithHighPriority1.toString(), optionsWithHighPriority2.toString());
485+
486+
Options optionsWithMediumPriority =
487+
Options.fromPartitinoedUpdateOptions(Options.priority(RpcPriority.MEDIUM));
488+
assertEquals(Priority.PRIORITY_MEDIUM, optionsWithMediumPriority.priority());
489+
assertEquals("priority: MEDIUM ", optionsWithMediumPriority.toString());
490+
assertNotEquals(optionsWithHighPriority1, optionsWithMediumPriority);
491+
assertNotEquals(optionsWithHighPriority1.hashCode(), optionsWithMediumPriority.hashCode());
492+
}
493+
463494
@Test
464495
public void testQueryOptionsEquality() {
465496
Options option1 = Options.fromQueryOptions();
@@ -617,6 +648,26 @@ public void updateEquality() {
617648
assertThat(o2.equals(o3)).isFalse();
618649
}
619650

651+
@Test
652+
public void partitionedUpdateWithTag() {
653+
String tag1 = "app=spanner,env=test";
654+
Options o1 = Options.fromPartitinoedUpdateOptions(Options.tag(tag1));
655+
assertEquals(tag1, o1.tag());
656+
assertEquals("tag: " + tag1 + " ", o1.toString());
657+
658+
Options o2 = Options.fromPartitinoedUpdateOptions(Options.tag(tag1));
659+
assertEquals(o1, o2);
660+
assertEquals(o1.hashCode(), o2.hashCode());
661+
assertEquals(o1.toString(), o2.toString());
662+
663+
String tag2 = "app=spanner,env=stage";
664+
Options o3 = Options.fromPartitinoedUpdateOptions(Options.tag(tag2));
665+
assertEquals("tag: " + tag2 + " ", o3.toString());
666+
assertNotEquals(o2, o3);
667+
assertNotEquals(o2.hashCode(), o3.hashCode());
668+
assertNotEquals(o2.toString(), o3.toString());
669+
}
670+
620671
@Test
621672
public void transactionOptionsTest() {
622673
String tag = "app=spanner,env=test";
@@ -706,27 +757,27 @@ public void transactionOptionsExcludeTxnFromChangeStreams() {
706757
assertNotEquals(option1.hashCode(), option3.hashCode());
707758

708759
assertTrue(option1.withExcludeTxnFromChangeStreams());
709-
assertThat(option1.toString()).contains("withExcludeTxnFromChangeStreams: true");
760+
assertEquals("withExcludeTxnFromChangeStreams: true ", option1.toString());
710761

711762
assertNull(option3.withExcludeTxnFromChangeStreams());
712-
assertThat(option3.toString()).doesNotContain("withExcludeTxnFromChangeStreams: true");
763+
assertEquals("", option3.toString());
713764
}
714765

715766
@Test
716-
public void updateOptionsExcludeTxnFromChangeStreams() {
717-
Options option1 = Options.fromUpdateOptions(Options.excludeTxnFromChangeStreams());
718-
Options option2 = Options.fromUpdateOptions(Options.excludeTxnFromChangeStreams());
719-
Options option3 = Options.fromUpdateOptions();
767+
public void partitionedUpdateOptionsExcludeTxnFromChangeStreams() {
768+
Options option1 = Options.fromPartitinoedUpdateOptions(Options.excludeTxnFromChangeStreams());
769+
Options option2 = Options.fromPartitinoedUpdateOptions(Options.excludeTxnFromChangeStreams());
770+
Options option3 = Options.fromPartitinoedUpdateOptions();
720771

721772
assertEquals(option1, option2);
722773
assertEquals(option1.hashCode(), option2.hashCode());
723774
assertNotEquals(option1, option3);
724775
assertNotEquals(option1.hashCode(), option3.hashCode());
725776

726777
assertTrue(option1.withExcludeTxnFromChangeStreams());
727-
assertThat(option1.toString()).contains("withExcludeTxnFromChangeStreams: true");
778+
assertEquals("withExcludeTxnFromChangeStreams: true ", option1.toString());
728779

729780
assertNull(option3.withExcludeTxnFromChangeStreams());
730-
assertThat(option3.toString()).doesNotContain("withExcludeTxnFromChangeStreams: true");
781+
assertEquals("", option3.toString());
731782
}
732783
}

0 commit comments

Comments
 (0)