Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): add support for change streams transaction exclusion option #9779

Merged
merged 2 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,10 @@ type applyOption struct {
transactionTag string
// priority is the RPC priority that is used for the commit operation.
priority sppb.RequestOptions_Priority
// If excludeTxnFromChangeStreams == true, mutations from this Client.Apply
// will not be recorded in allowed tracking change streams with DDL option
// allow_txn_exclusion=true.
excludeTxnFromChangeStreams bool
}

// An ApplyOption is an optional argument to Apply.
Expand Down Expand Up @@ -721,6 +725,13 @@ func Priority(priority sppb.RequestOptions_Priority) ApplyOption {
}
}

// ExcludeTxnFromChangeStreams returns an ApplyOptions that sets whether to exclude recording this commit operation from allowed tracking change streams.
func ExcludeTxnFromChangeStreams() ApplyOption {
return func(ao *applyOption) {
ao.excludeTxnFromChangeStreams = true
}
}

// Apply applies a list of mutations atomically to the database.
func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
ao := &applyOption{}
Expand All @@ -739,10 +750,10 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption)
if !ao.atLeastOnce {
resp, err := c.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
return t.BufferWrite(ms)
}, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag})
}, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag, ExcludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams})
return resp.CommitTs, err
}
t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag, disableRouteToLeader: c.disableRouteToLeader}
t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag, disableRouteToLeader: c.disableRouteToLeader, excludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams}
return t.applyAtLeastOnce(ctx, ms...)
}

Expand All @@ -753,14 +764,20 @@ type BatchWriteOptions struct {

// The transaction tag to use for this request.
TransactionTag string

// If excludeTxnFromChangeStreams == true, modifications from all transactions
// in this batch write request will not be recorded in allowed tracking
// change treams with DDL option allow_txn_exclusion=true.
ExcludeTxnFromChangeStreams bool
}

// merge combines two BatchWriteOptions such that the input parameter will have higher
// order of precedence.
func (bwo BatchWriteOptions) merge(opts BatchWriteOptions) BatchWriteOptions {
merged := BatchWriteOptions{
TransactionTag: bwo.TransactionTag,
Priority: bwo.Priority,
TransactionTag: bwo.TransactionTag,
Priority: bwo.Priority,
ExcludeTxnFromChangeStreams: bwo.ExcludeTxnFromChangeStreams || opts.ExcludeTxnFromChangeStreams,
}
if opts.TransactionTag != "" {
merged.TransactionTag = opts.TransactionTag
Expand Down Expand Up @@ -915,9 +932,10 @@ func (c *Client) BatchWriteWithOptions(ctx context.Context, mgs []*MutationGroup
var md metadata.MD
sh.updateLastUseTime()
stream, rpcErr := sh.getClient().BatchWrite(contextWithOutgoingMetadata(ct, sh.getMetadata(), c.disableRouteToLeader), &sppb.BatchWriteRequest{
Session: sh.getID(),
MutationGroups: mgsPb,
RequestOptions: createRequestOptions(opts.Priority, "", opts.TransactionTag),
Session: sh.getID(),
MutationGroups: mgsPb,
RequestOptions: createRequestOptions(opts.Priority, "", opts.TransactionTag),
ExcludeTxnFromChangeStreams: opts.ExcludeTxnFromChangeStreams,
}, gax.WithGRPCOptions(grpc.Header(&md)))

if getGFELatencyMetricsFlag() && md != nil && c.ct != nil {
Expand Down
200 changes: 200 additions & 0 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5338,3 +5338,203 @@ func TestClient_NestedReadWriteTransactionWithTag_InnerBlindWrite(t *testing.T)
t.Fatalf("transaction tag mismatch\nGot: %s\nWant: %s", g, w)
}
}

func TestClient_ReadWriteTransactionWithExcludeTxnFromChangeStreams_ExecuteSqlRequest(t *testing.T) {
server, client, teardown := setupMockedTestServer(t)
defer teardown()
ctx := context.Background()

_, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
_, err := tx.Update(ctx, Statement{SQL: UpdateBarSetFoo})
if err != nil {
return err
}
return nil
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
if err != nil {
t.Fatalf("Failed to execute the transaction: %s", err)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.CommitRequest{}}, requests); err != nil {
t.Fatal(err)
}
if !requests[1].(*sppb.ExecuteSqlRequest).Transaction.GetBegin().ExcludeTxnFromChangeStreams {
t.Fatal("Transaction is not set to be excluded from change streams")
}
}

func TestClient_ReadWriteTransactionWithExcludeTxnFromChangeStreams_BufferWrite(t *testing.T) {
server, client, teardown := setupMockedTestServer(t)
defer teardown()
ctx := context.Background()

_, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
if err := tx.BufferWrite([]*Mutation{
Insert("foo", []string{"col1"}, []interface{}{"key1"}),
}); err != nil {
return err
}
return nil
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
if err != nil {
t.Fatalf("Failed to execute the transaction: %s", err)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.BeginTransactionRequest{},
&sppb.CommitRequest{}}, requests); err != nil {
t.Fatal(err)
}
if !requests[1].(*sppb.BeginTransactionRequest).Options.ExcludeTxnFromChangeStreams {
t.Fatal("Transaction is not set to be excluded from change streams")
}
}

func TestClient_ReadWriteTransactionWithExcludeTxnFromChangeStreams_BatchUpdate(t *testing.T) {
server, client, teardown := setupMockedTestServer(t)
defer teardown()
ctx := context.Background()

_, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
_, err := tx.BatchUpdate(ctx, []Statement{NewStatement(UpdateBarSetFoo)})
if err != nil {
return err
}
return nil
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
if err != nil {
t.Fatalf("Failed to execute the transaction: %s", err)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.ExecuteBatchDmlRequest{},
&sppb.CommitRequest{}}, requests); err != nil {
t.Fatal(err)
}
if !requests[1].(*sppb.ExecuteBatchDmlRequest).Transaction.GetBegin().ExcludeTxnFromChangeStreams {
t.Fatal("Transaction is not set to be excluded from change streams")
}
}

func TestClient_RequestLevelDMLWithExcludeTxnFromChangeStreams_Failed(t *testing.T) {
_, client, teardown := setupMockedTestServer(t)
defer teardown()
ctx := context.Background()

// Test normal DML
_, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
_, err := tx.UpdateWithOptions(ctx, Statement{SQL: UpdateBarSetFoo}, QueryOptions{ExcludeTxnFromChangeStreams: true})
if err != nil {
return err
}
return nil
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
if err == nil {
t.Fatalf("Missing expected exception")
}
msg := "cannot set exclude transaction from change streams for a request-level DML statement."
if status.Code(err) != codes.InvalidArgument || !strings.Contains(err.Error(), msg) {
t.Fatalf("error mismatch\nGot: %v\nWant: %v", err, msg)
}

// Test batch DML
_, err = client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
_, err := tx.UpdateWithOptions(ctx, Statement{SQL: UpdateBarSetFoo}, QueryOptions{ExcludeTxnFromChangeStreams: true})
if err != nil {
return err
}
return nil
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
if err == nil {
t.Fatalf("Missing expected exception")
}
if status.Code(err) != codes.InvalidArgument || !strings.Contains(err.Error(), msg) {
t.Fatalf("error mismatch\nGot: %v\nWant: %v", err, msg)
}
}

func TestClient_ApplyExcludeTxnFromChangeStreams(t *testing.T) {
server, client, teardown := setupMockedTestServer(t)
defer teardown()

ms := []*Mutation{
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
}

_, err := client.Apply(context.Background(), ms, ExcludeTxnFromChangeStreams())
if err != nil {
t.Fatal(err)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.BeginTransactionRequest{},
&sppb.CommitRequest{}}, requests); err != nil {
t.Fatal(err)
}
if !requests[1].(*sppb.BeginTransactionRequest).Options.ExcludeTxnFromChangeStreams {
t.Fatal("Transaction is not set to be excluded from change streams")
}
}

func TestClient_ApplyAtLeastOnceExcludeTxnFromChangeStreams(t *testing.T) {
server, client, teardown := setupMockedTestServer(t)
defer teardown()

ms := []*Mutation{
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
}

_, err := client.Apply(context.Background(), ms, []ApplyOption{ExcludeTxnFromChangeStreams(), ApplyAtLeastOnce()}...)
if err != nil {
t.Fatal(err)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.CommitRequest{}}, requests); err != nil {
t.Fatal(err)
}
if !requests[1].(*sppb.CommitRequest).Transaction.(*sppb.CommitRequest_SingleUseTransaction).SingleUseTransaction.ExcludeTxnFromChangeStreams {
t.Fatal("Transaction is not set to be excluded from change streams")
}
}

func TestClient_BatchWriteExcludeTxnFromChangeStreams(t *testing.T) {
server, client, teardown := setupMockedTestServer(t)
defer teardown()

mutationGroups := []*MutationGroup{
{[]*Mutation{
{opInsertOrUpdate, "t_test", nil, []string{"key", "val"}, []interface{}{"foo1", 1}},
}},
}
iter := client.BatchWriteWithOptions(context.Background(), mutationGroups, BatchWriteOptions{ExcludeTxnFromChangeStreams: true})
responseCount := 0
doFunc := func(r *sppb.BatchWriteResponse) error {
responseCount++
return nil
}
if err := iter.Do(doFunc); err != nil {
t.Fatal(err)
}
if responseCount != len(mutationGroups) {
t.Fatalf("Response count mismatch.\nGot: %v\nWant:%v", responseCount, len(mutationGroups))
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.BatchWriteRequest{}}, requests); err != nil {
t.Fatal(err)
}
if !requests[1].(*sppb.BatchWriteRequest).ExcludeTxnFromChangeStreams {
t.Fatal("Transaction is not set to be excluded from change streams")
}
}
7 changes: 4 additions & 3 deletions spanner/pdml.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
// Execute the PDML and retry if the transaction is aborted.
executePdmlWithRetry := func(ctx context.Context) (int64, error) {
for {
count, err := executePdml(contextWithOutgoingMetadata(ctx, sh.getMetadata(), c.disableRouteToLeader), sh, req)
count, err := executePdml(contextWithOutgoingMetadata(ctx, sh.getMetadata(), c.disableRouteToLeader), sh, req, options)
if err == nil {
return count, nil
}
Expand All @@ -106,14 +106,15 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
// 3. Execute the update statement on the PDML transaction
//
// Note that PDML transactions cannot be committed or rolled back.
func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlRequest) (count int64, err error) {
func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlRequest, options QueryOptions) (count int64, err error) {
var md metadata.MD
sh.updateLastUseTime()
// Begin transaction.
res, err := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{
Session: sh.getID(),
Options: &sppb.TransactionOptions{
Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}},
Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}},
ExcludeTxnFromChangeStreams: options.ExcludeTxnFromChangeStreams,
},
})
if err != nil {
Expand Down
22 changes: 22 additions & 0 deletions spanner/pdml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,25 @@ func TestPartitionedUpdate_Tagging(t *testing.T) {
}
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 1, sppb.RequestOptions{RequestTag: "pdml-tag"})
}

func TestPartitionedUpdate_ExcludeTxnFromChangeStreams(t *testing.T) {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()

_, err := client.PartitionedUpdateWithOptions(ctx, NewStatement(UpdateBarSetFoo), QueryOptions{ExcludeTxnFromChangeStreams: true})
if err != nil {
t.Fatalf("expect no errors, but got %v", err)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.BeginTransactionRequest{},
&sppb.ExecuteSqlRequest{}}, requests); err != nil {
t.Fatal(err)
}

if !requests[1].(*sppb.BeginTransactionRequest).GetOptions().GetExcludeTxnFromChangeStreams() {
t.Fatal("Transaction is not set to be excluded from change streams")
}
}
Loading
Loading