Skip to content

Commit b2aa09d

Browse files
authored
Merge branch 'googleapis:main' into main
2 parents 4a6aa8e + 2c8ecf6 commit b2aa09d

File tree

3 files changed

+55
-1
lines changed

3 files changed

+55
-1
lines changed

google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java

+37
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import com.google.cloud.spanner.Mutation;
5353
import com.google.cloud.spanner.Mutation.WriteBuilder;
5454
import com.google.cloud.spanner.Options;
55+
import com.google.cloud.spanner.Options.RpcPriority;
5556
import com.google.cloud.spanner.Partition;
5657
import com.google.cloud.spanner.PartitionOptions;
5758
import com.google.cloud.spanner.ReadContext;
@@ -128,6 +129,8 @@
128129
import com.google.spanner.executor.v1.MutationAction.Mod;
129130
import com.google.spanner.executor.v1.MutationAction.UpdateArgs;
130131
import com.google.spanner.executor.v1.OperationResponse;
132+
import com.google.spanner.executor.v1.PartitionedUpdateAction;
133+
import com.google.spanner.executor.v1.PartitionedUpdateAction.ExecutePartitionedUpdateOptions;
131134
import com.google.spanner.executor.v1.QueryAction;
132135
import com.google.spanner.executor.v1.ReadAction;
133136
import com.google.spanner.executor.v1.RestoreCloudDatabaseAction;
@@ -886,6 +889,13 @@ private Status executeAction(
886889
} else if (action.hasExecutePartition()) {
887890
return executeExecutePartition(
888891
action.getExecutePartition(), outcomeSender, executionContext);
892+
} else if (action.hasPartitionedUpdate()) {
893+
if (dbPath == null) {
894+
throw SpannerExceptionFactory.newSpannerException(
895+
ErrorCode.INVALID_ARGUMENT, "Database path must be set for this action");
896+
}
897+
DatabaseClient dbClient = getClient().getDatabaseClient(DatabaseId.of(dbPath));
898+
return executePartitionedUpdate(action.getPartitionedUpdate(), dbClient, outcomeSender);
889899
} else if (action.hasCloseBatchTxn()) {
890900
return executeCloseBatchTxn(action.getCloseBatchTxn(), outcomeSender, executionContext);
891901
} else if (action.hasExecuteChangeStreamQuery()) {
@@ -1974,6 +1984,33 @@ private Status executeExecutePartition(
19741984
}
19751985
}
19761986

1987+
/** Execute a partitioned update which runs different partitions in parallel. */
1988+
private Status executePartitionedUpdate(
1989+
PartitionedUpdateAction action, DatabaseClient dbClient, OutcomeSender sender) {
1990+
try {
1991+
ExecutePartitionedUpdateOptions options = action.getOptions();
1992+
Long count =
1993+
dbClient.executePartitionedUpdate(
1994+
Statement.of(action.getUpdate().getSql()),
1995+
Options.tag(options.getTag()),
1996+
Options.priority(RpcPriority.fromProto(options.getRpcPriority())));
1997+
SpannerActionOutcome outcome =
1998+
SpannerActionOutcome.newBuilder()
1999+
.setStatus(toProto(Status.OK))
2000+
.addDmlRowsModified(count)
2001+
.build();
2002+
sender.sendOutcome(outcome);
2003+
return sender.finishWithOK();
2004+
} catch (SpannerException e) {
2005+
return sender.finishWithError(toStatus(e));
2006+
} catch (Exception e) {
2007+
return sender.finishWithError(
2008+
toStatus(
2009+
SpannerExceptionFactory.newSpannerException(
2010+
ErrorCode.INVALID_ARGUMENT, "Unexpected error: " + e.getMessage())));
2011+
}
2012+
}
2013+
19772014
/** Build a child partition record proto out of childPartitionRecord returned by client. */
19782015
private ChildPartitionsRecord buildChildPartitionRecord(Struct childPartitionRecord)
19792016
throws Exception {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,21 @@ public final class Options implements Serializable {
3232
public enum RpcPriority {
3333
LOW(Priority.PRIORITY_LOW),
3434
MEDIUM(Priority.PRIORITY_MEDIUM),
35-
HIGH(Priority.PRIORITY_HIGH);
35+
HIGH(Priority.PRIORITY_HIGH),
36+
UNSPECIFIED(Priority.PRIORITY_UNSPECIFIED);
3637

3738
private final Priority proto;
3839

3940
RpcPriority(Priority proto) {
4041
this.proto = Preconditions.checkNotNull(proto);
4142
}
43+
44+
public static RpcPriority fromProto(Priority proto) {
45+
for (RpcPriority e : RpcPriority.values()) {
46+
if (e.proto.equals(proto)) return e;
47+
}
48+
return RpcPriority.UNSPECIFIED;
49+
}
4250
}
4351

4452
/** Marker interface to mark options applicable to both Read and Query operations */

google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java

+9
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,15 @@ public void testUpdateOptionsPriority() {
300300
assertEquals("priority: " + priority + " ", options.toString());
301301
}
302302

303+
@Test
304+
public void testRpcPriorityEnumFromProto() {
305+
assertEquals(RpcPriority.fromProto(Priority.PRIORITY_LOW), RpcPriority.LOW);
306+
assertEquals(RpcPriority.fromProto(Priority.PRIORITY_MEDIUM), RpcPriority.MEDIUM);
307+
assertEquals(RpcPriority.fromProto(Priority.PRIORITY_HIGH), RpcPriority.HIGH);
308+
assertEquals(RpcPriority.fromProto(Priority.PRIORITY_UNSPECIFIED), RpcPriority.UNSPECIFIED);
309+
assertEquals(RpcPriority.fromProto(null), RpcPriority.UNSPECIFIED);
310+
}
311+
303312
@Test
304313
public void testTransactionOptionsHashCode() {
305314
Options option1 = Options.fromTransactionOptions();

0 commit comments

Comments
 (0)