Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add option to indicate that a statement is the last in a transaction #3647

Merged
merged 7 commits into from
Feb 17, 2025
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ If you are using Maven with [BOM][libraries-bom], add this to your pom.xml file:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>26.53.0</version>
<version>26.54.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,9 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
if (!isReadOnly()) {
builder.setSeqno(getSeqNo());
}
if (options.hasLastStatement()) {
builder.setLastStatement(options.isLastStatement());
}
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
builder.setRequestOptions(buildRequestOptions(options));
return builder;
Expand Down Expand Up @@ -743,6 +746,9 @@ ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder(
if (selector != null) {
builder.setTransaction(selector);
}
if (options.hasLastStatement()) {
builder.setLastStatements(options.isLastStatement());
}
builder.setSeqno(getSeqNo());
builder.setRequestOptions(buildRequestOptions(options));
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ public interface ReadQueryUpdateTransactionOption
/** Marker interface to mark options applicable to Update and Write operations */
public interface UpdateTransactionOption extends UpdateOption, TransactionOption {}

/** Marker interface for options that can be used with both executeQuery and executeUpdate. */
public interface QueryUpdateOption extends QueryOption, UpdateOption {}

/**
* Marker interface to mark options applicable to Create, Update and Delete operations in admin
* API.
Expand Down Expand Up @@ -236,6 +239,20 @@ public static DataBoostQueryOption dataBoostEnabled(Boolean dataBoostEnabled) {
return new DataBoostQueryOption(dataBoostEnabled);
}

/**
* If set to true, this option marks the end of the transaction. The transaction should be
* committed or aborted after this statement executes, and attempts to execute any other requests
* against this transaction (including reads and queries) will be rejected. Mixing mutations with
* statements that are marked as the last statement is not allowed.
*
* <p>For DML statements, setting this option may cause some error reporting to be deferred until
* commit time (e.g. validation of unique constraints). Given this, successful execution of a DML
* statement should not be assumed until the transaction commits.
*/
public static QueryUpdateOption lastStatement() {
return new LastStatementUpdateOption();
}

/**
* Specifying this will cause the list operation to start fetching the record from this onwards.
*/
Expand Down Expand Up @@ -494,6 +511,7 @@ void appendToOptions(Options options) {
private DecodeMode decodeMode;
private RpcOrderBy orderBy;
private RpcLockHint lockHint;
private Boolean lastStatement;

// Construction is via factory methods below.
private Options() {}
Expand Down Expand Up @@ -630,6 +648,14 @@ OrderBy orderBy() {
return orderBy == null ? null : orderBy.proto;
}

boolean hasLastStatement() {
return lastStatement != null;
}

Boolean isLastStatement() {
return lastStatement;
}

boolean hasLockHint() {
return lockHint != null;
}
Expand Down Expand Up @@ -694,6 +720,9 @@ public String toString() {
if (orderBy != null) {
b.append("orderBy: ").append(orderBy).append(' ');
}
if (lastStatement != null) {
b.append("lastStatement: ").append(lastStatement).append(' ');
}
if (lockHint != null) {
b.append("lockHint: ").append(lockHint).append(' ');
}
Expand Down Expand Up @@ -737,6 +766,7 @@ public boolean equals(Object o) {
&& Objects.equals(dataBoostEnabled(), that.dataBoostEnabled())
&& Objects.equals(directedReadOptions(), that.directedReadOptions())
&& Objects.equals(orderBy(), that.orderBy())
&& Objects.equals(isLastStatement(), that.isLastStatement())
&& Objects.equals(lockHint(), that.lockHint());
}

Expand Down Expand Up @@ -797,6 +827,9 @@ public int hashCode() {
if (orderBy != null) {
result = 31 * result + orderBy.hashCode();
}
if (lastStatement != null) {
result = 31 * result + lastStatement.hashCode();
}
if (lockHint != null) {
result = 31 * result + lockHint.hashCode();
}
Expand Down Expand Up @@ -965,4 +998,24 @@ public boolean equals(Object o) {
return Objects.equals(filter, ((FilterOption) o).filter);
}
}

static final class LastStatementUpdateOption extends InternalOption implements QueryUpdateOption {

LastStatementUpdateOption() {}

@Override
void appendToOptions(Options options) {
options.lastStatement = true;
}

@Override
public int hashCode() {
return LastStatementUpdateOption.class.hashCode();
}

@Override
public boolean equals(Object o) {
return o instanceof LastStatementUpdateOption;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ static ErrorDetails extractErrorDetails(Throwable cause) {
if (cause instanceof ApiException) {
return ((ApiException) cause).getErrorDetails();
}
if (cause instanceof SpannerException) {
return ((SpannerException) cause).getErrorDetails();
}
prevCause = cause;
cause = cause.getCause();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.QueryUpdateOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.PartitionOptions;
import com.google.cloud.spanner.ReadOnlyTransaction;
Expand Down Expand Up @@ -298,7 +299,8 @@ private ApiFuture<ResultSet> executeDmlReturningAsync(
writeTransaction.run(
transaction ->
DirectExecuteResultSet.ofResultSet(
transaction.executeQuery(update.getStatement(), options)));
transaction.executeQuery(
update.getStatement(), appendLastStatement(options))));
Comment on lines +302 to +303
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick clarification: Since a SingleUseTransaction can only execute one query, do we implicitly mark that statement as the last statement? The changes in this file seem to be related to this, correct?

What happens if we don’t mark it as the last statement in SingleUseTransaction?
Since the customer cannot run additional queries anyway, I’m trying to understand the benefit of marking it as the last statement.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This is only relevant in read/write transactions, so in this case only for DML statements with a THEN RETURN clause, which are also executed using executeQuery(..).

Your observation that we automatically mark a statement in a SingleUseTransaction as the last_statement is correct. We do this, because we know that this will be the only statement in this transaction. By giving Spanner this hint, Spanner can internally optimise the execution of this statement by skipping some verifications, that are also executed during the Commit call.

And more in general: By supplying this hint to Spanner, Spanner could in the future also consider other possible optimizations that are possible when it knows that the current statement will be the last in a read/write transaction.

state = UnitOfWorkState.COMMITTED;
return resultSet;
} catch (Throwable t) {
Expand Down Expand Up @@ -554,11 +556,15 @@ private ApiFuture<Tuple<Long, ResultSet>> executeTransactionalUpdateAsync(
transaction -> {
if (analyzeMode == AnalyzeMode.NONE) {
return Tuple.of(
transaction.executeUpdate(update.getStatement(), options), null);
transaction.executeUpdate(
update.getStatement(), appendLastStatement(options)),
null);
}
ResultSet resultSet =
transaction.analyzeUpdateStatement(
update.getStatement(), analyzeMode.getQueryAnalyzeMode(), options);
update.getStatement(),
analyzeMode.getQueryAnalyzeMode(),
appendLastStatement(options));
return Tuple.of(null, resultSet);
});
state = UnitOfWorkState.COMMITTED;
Expand All @@ -582,6 +588,29 @@ private ApiFuture<Tuple<Long, ResultSet>> executeTransactionalUpdateAsync(
return transactionalResult;
}

private static final QueryUpdateOption[] LAST_STATEMENT_OPTIONS =
new QueryUpdateOption[] {Options.lastStatement()};

private static UpdateOption[] appendLastStatement(UpdateOption[] options) {
if (options.length == 0) {
return LAST_STATEMENT_OPTIONS;
}
UpdateOption[] result = new UpdateOption[options.length + 1];
System.arraycopy(options, 0, result, 0, options.length);
result[result.length - 1] = LAST_STATEMENT_OPTIONS[0];
return result;
}

private static QueryOption[] appendLastStatement(QueryOption[] options) {
if (options.length == 0) {
return LAST_STATEMENT_OPTIONS;
}
QueryOption[] result = new QueryOption[options.length + 1];
System.arraycopy(options, 0, result, 0, options.length);
result[result.length - 1] = LAST_STATEMENT_OPTIONS[0];
return result;
}
Comment on lines +594 to +612
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: We cannot generalize these two methods into one, due to the large number of different possible interfaces for Query/Update/Read/QueryOrUpdate/QueryOrUpdateOrTransaction/Options.


/**
* Adds a callback to the given future that retries the update statement using Partitioned DML if
* the original statement fails with a {@link TransactionMutationLimitExceededException}.
Expand Down Expand Up @@ -719,7 +748,8 @@ private ApiFuture<long[]> executeTransactionalBatchUpdateAsync(
try {
long[] res =
transaction.batchUpdate(
Iterables.transform(updates, ParsedStatement::getStatement), options);
Iterables.transform(updates, ParsedStatement::getStatement),
appendLastStatement(options));
state = UnitOfWorkState.COMMITTED;
return res;
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -266,6 +267,42 @@ public void testGetExecuteBatchDmlRequestBuilderWithPriority() {
assertEquals(Priority.PRIORITY_LOW, request.getRequestOptions().getPriority());
}

@Test
public void testExecuteSqlLastStatement() {
assertFalse(
context
.getExecuteSqlRequestBuilder(
Statement.of("insert into test (id) values (1)"),
QueryMode.NORMAL,
Options.fromUpdateOptions(),
false)
.getLastStatement());
assertTrue(
context
.getExecuteSqlRequestBuilder(
Statement.of("insert into test (id) values (1)"),
QueryMode.NORMAL,
Options.fromUpdateOptions(Options.lastStatement()),
false)
.getLastStatement());
}

@Test
public void testExecuteBatchDmlLastStatement() {
assertFalse(
context
.getExecuteBatchDmlRequestBuilder(
Collections.singleton(Statement.of("insert into test (id) values (1)")),
Options.fromUpdateOptions())
.getLastStatements());
assertTrue(
context
.getExecuteBatchDmlRequestBuilder(
Collections.singleton(Statement.of("insert into test (id) values (1)")),
Options.fromUpdateOptions(Options.lastStatement()))
.getLastStatements());
}

public void executeSqlRequestBuilderWithRequestOptions() {
ExecuteSqlRequest request =
context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,4 +789,22 @@ public void updateOptionsExcludeTxnFromChangeStreams() {
assertNull(option3.withExcludeTxnFromChangeStreams());
assertThat(option3.toString()).doesNotContain("withExcludeTxnFromChangeStreams: true");
}

@Test
public void testLastStatement() {
Options option1 = Options.fromUpdateOptions(Options.lastStatement());
Options option2 = Options.fromUpdateOptions(Options.lastStatement());
Options option3 = Options.fromUpdateOptions();

assertEquals(option1, option2);
assertEquals(option1.hashCode(), option2.hashCode());
assertNotEquals(option1, option3);
assertNotEquals(option1.hashCode(), option3.hashCode());

assertTrue(option1.isLastStatement());
assertThat(option1.toString()).contains("lastStatement: true");

assertNull(option3.isLastStatement());
assertThat(option3.toString()).doesNotContain("lastStatement: true");
}
}
Loading
Loading