Skip to content

Commit 979ce94

Browse files
authored
feat(spanner): add support for change streams transaction exclusion option (#9779)
1 parent c22943d commit 979ce94

File tree

5 files changed

+293
-19
lines changed

5 files changed

+293
-19
lines changed

spanner/client.go

+25-7
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,10 @@ type applyOption struct {
683683
transactionTag string
684684
// priority is the RPC priority that is used for the commit operation.
685685
priority sppb.RequestOptions_Priority
686+
// If excludeTxnFromChangeStreams == true, mutations from this Client.Apply
687+
// will not be recorded in allowed tracking change streams with DDL option
688+
// allow_txn_exclusion=true.
689+
excludeTxnFromChangeStreams bool
686690
}
687691

688692
// An ApplyOption is an optional argument to Apply.
@@ -721,6 +725,13 @@ func Priority(priority sppb.RequestOptions_Priority) ApplyOption {
721725
}
722726
}
723727

728+
// ExcludeTxnFromChangeStreams returns an ApplyOptions that sets whether to exclude recording this commit operation from allowed tracking change streams.
729+
func ExcludeTxnFromChangeStreams() ApplyOption {
730+
return func(ao *applyOption) {
731+
ao.excludeTxnFromChangeStreams = true
732+
}
733+
}
734+
724735
// Apply applies a list of mutations atomically to the database.
725736
func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
726737
ao := &applyOption{}
@@ -739,10 +750,10 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption)
739750
if !ao.atLeastOnce {
740751
resp, err := c.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
741752
return t.BufferWrite(ms)
742-
}, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag})
753+
}, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag, ExcludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams})
743754
return resp.CommitTs, err
744755
}
745-
t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag, disableRouteToLeader: c.disableRouteToLeader}
756+
t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag, disableRouteToLeader: c.disableRouteToLeader, excludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams}
746757
return t.applyAtLeastOnce(ctx, ms...)
747758
}
748759

@@ -753,14 +764,20 @@ type BatchWriteOptions struct {
753764

754765
// The transaction tag to use for this request.
755766
TransactionTag string
767+
768+
// If excludeTxnFromChangeStreams == true, modifications from all transactions
769+
// in this batch write request will not be recorded in allowed tracking
770+
// change treams with DDL option allow_txn_exclusion=true.
771+
ExcludeTxnFromChangeStreams bool
756772
}
757773

758774
// merge combines two BatchWriteOptions such that the input parameter will have higher
759775
// order of precedence.
760776
func (bwo BatchWriteOptions) merge(opts BatchWriteOptions) BatchWriteOptions {
761777
merged := BatchWriteOptions{
762-
TransactionTag: bwo.TransactionTag,
763-
Priority: bwo.Priority,
778+
TransactionTag: bwo.TransactionTag,
779+
Priority: bwo.Priority,
780+
ExcludeTxnFromChangeStreams: bwo.ExcludeTxnFromChangeStreams || opts.ExcludeTxnFromChangeStreams,
764781
}
765782
if opts.TransactionTag != "" {
766783
merged.TransactionTag = opts.TransactionTag
@@ -915,9 +932,10 @@ func (c *Client) BatchWriteWithOptions(ctx context.Context, mgs []*MutationGroup
915932
var md metadata.MD
916933
sh.updateLastUseTime()
917934
stream, rpcErr := sh.getClient().BatchWrite(contextWithOutgoingMetadata(ct, sh.getMetadata(), c.disableRouteToLeader), &sppb.BatchWriteRequest{
918-
Session: sh.getID(),
919-
MutationGroups: mgsPb,
920-
RequestOptions: createRequestOptions(opts.Priority, "", opts.TransactionTag),
935+
Session: sh.getID(),
936+
MutationGroups: mgsPb,
937+
RequestOptions: createRequestOptions(opts.Priority, "", opts.TransactionTag),
938+
ExcludeTxnFromChangeStreams: opts.ExcludeTxnFromChangeStreams,
921939
}, gax.WithGRPCOptions(grpc.Header(&md)))
922940

923941
if getGFELatencyMetricsFlag() && md != nil && c.ct != nil {

spanner/client_test.go

+200
Original file line numberDiff line numberDiff line change
@@ -5338,3 +5338,203 @@ func TestClient_NestedReadWriteTransactionWithTag_InnerBlindWrite(t *testing.T)
53385338
t.Fatalf("transaction tag mismatch\nGot: %s\nWant: %s", g, w)
53395339
}
53405340
}
5341+
5342+
func TestClient_ReadWriteTransactionWithExcludeTxnFromChangeStreams_ExecuteSqlRequest(t *testing.T) {
5343+
server, client, teardown := setupMockedTestServer(t)
5344+
defer teardown()
5345+
ctx := context.Background()
5346+
5347+
_, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
5348+
_, err := tx.Update(ctx, Statement{SQL: UpdateBarSetFoo})
5349+
if err != nil {
5350+
return err
5351+
}
5352+
return nil
5353+
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
5354+
if err != nil {
5355+
t.Fatalf("Failed to execute the transaction: %s", err)
5356+
}
5357+
requests := drainRequestsFromServer(server.TestSpanner)
5358+
if err := compareRequests([]interface{}{
5359+
&sppb.BatchCreateSessionsRequest{},
5360+
&sppb.ExecuteSqlRequest{},
5361+
&sppb.CommitRequest{}}, requests); err != nil {
5362+
t.Fatal(err)
5363+
}
5364+
if !requests[1].(*sppb.ExecuteSqlRequest).Transaction.GetBegin().ExcludeTxnFromChangeStreams {
5365+
t.Fatal("Transaction is not set to be excluded from change streams")
5366+
}
5367+
}
5368+
5369+
func TestClient_ReadWriteTransactionWithExcludeTxnFromChangeStreams_BufferWrite(t *testing.T) {
5370+
server, client, teardown := setupMockedTestServer(t)
5371+
defer teardown()
5372+
ctx := context.Background()
5373+
5374+
_, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
5375+
if err := tx.BufferWrite([]*Mutation{
5376+
Insert("foo", []string{"col1"}, []interface{}{"key1"}),
5377+
}); err != nil {
5378+
return err
5379+
}
5380+
return nil
5381+
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
5382+
if err != nil {
5383+
t.Fatalf("Failed to execute the transaction: %s", err)
5384+
}
5385+
requests := drainRequestsFromServer(server.TestSpanner)
5386+
if err := compareRequests([]interface{}{
5387+
&sppb.BatchCreateSessionsRequest{},
5388+
&sppb.BeginTransactionRequest{},
5389+
&sppb.CommitRequest{}}, requests); err != nil {
5390+
t.Fatal(err)
5391+
}
5392+
if !requests[1].(*sppb.BeginTransactionRequest).Options.ExcludeTxnFromChangeStreams {
5393+
t.Fatal("Transaction is not set to be excluded from change streams")
5394+
}
5395+
}
5396+
5397+
func TestClient_ReadWriteTransactionWithExcludeTxnFromChangeStreams_BatchUpdate(t *testing.T) {
5398+
server, client, teardown := setupMockedTestServer(t)
5399+
defer teardown()
5400+
ctx := context.Background()
5401+
5402+
_, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
5403+
_, err := tx.BatchUpdate(ctx, []Statement{NewStatement(UpdateBarSetFoo)})
5404+
if err != nil {
5405+
return err
5406+
}
5407+
return nil
5408+
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
5409+
if err != nil {
5410+
t.Fatalf("Failed to execute the transaction: %s", err)
5411+
}
5412+
requests := drainRequestsFromServer(server.TestSpanner)
5413+
if err := compareRequests([]interface{}{
5414+
&sppb.BatchCreateSessionsRequest{},
5415+
&sppb.ExecuteBatchDmlRequest{},
5416+
&sppb.CommitRequest{}}, requests); err != nil {
5417+
t.Fatal(err)
5418+
}
5419+
if !requests[1].(*sppb.ExecuteBatchDmlRequest).Transaction.GetBegin().ExcludeTxnFromChangeStreams {
5420+
t.Fatal("Transaction is not set to be excluded from change streams")
5421+
}
5422+
}
5423+
5424+
func TestClient_RequestLevelDMLWithExcludeTxnFromChangeStreams_Failed(t *testing.T) {
5425+
_, client, teardown := setupMockedTestServer(t)
5426+
defer teardown()
5427+
ctx := context.Background()
5428+
5429+
// Test normal DML
5430+
_, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
5431+
_, err := tx.UpdateWithOptions(ctx, Statement{SQL: UpdateBarSetFoo}, QueryOptions{ExcludeTxnFromChangeStreams: true})
5432+
if err != nil {
5433+
return err
5434+
}
5435+
return nil
5436+
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
5437+
if err == nil {
5438+
t.Fatalf("Missing expected exception")
5439+
}
5440+
msg := "cannot set exclude transaction from change streams for a request-level DML statement."
5441+
if status.Code(err) != codes.InvalidArgument || !strings.Contains(err.Error(), msg) {
5442+
t.Fatalf("error mismatch\nGot: %v\nWant: %v", err, msg)
5443+
}
5444+
5445+
// Test batch DML
5446+
_, err = client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
5447+
_, err := tx.UpdateWithOptions(ctx, Statement{SQL: UpdateBarSetFoo}, QueryOptions{ExcludeTxnFromChangeStreams: true})
5448+
if err != nil {
5449+
return err
5450+
}
5451+
return nil
5452+
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
5453+
if err == nil {
5454+
t.Fatalf("Missing expected exception")
5455+
}
5456+
if status.Code(err) != codes.InvalidArgument || !strings.Contains(err.Error(), msg) {
5457+
t.Fatalf("error mismatch\nGot: %v\nWant: %v", err, msg)
5458+
}
5459+
}
5460+
5461+
func TestClient_ApplyExcludeTxnFromChangeStreams(t *testing.T) {
5462+
server, client, teardown := setupMockedTestServer(t)
5463+
defer teardown()
5464+
5465+
ms := []*Mutation{
5466+
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
5467+
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
5468+
}
5469+
5470+
_, err := client.Apply(context.Background(), ms, ExcludeTxnFromChangeStreams())
5471+
if err != nil {
5472+
t.Fatal(err)
5473+
}
5474+
requests := drainRequestsFromServer(server.TestSpanner)
5475+
if err := compareRequests([]interface{}{
5476+
&sppb.BatchCreateSessionsRequest{},
5477+
&sppb.BeginTransactionRequest{},
5478+
&sppb.CommitRequest{}}, requests); err != nil {
5479+
t.Fatal(err)
5480+
}
5481+
if !requests[1].(*sppb.BeginTransactionRequest).Options.ExcludeTxnFromChangeStreams {
5482+
t.Fatal("Transaction is not set to be excluded from change streams")
5483+
}
5484+
}
5485+
5486+
func TestClient_ApplyAtLeastOnceExcludeTxnFromChangeStreams(t *testing.T) {
5487+
server, client, teardown := setupMockedTestServer(t)
5488+
defer teardown()
5489+
5490+
ms := []*Mutation{
5491+
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
5492+
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
5493+
}
5494+
5495+
_, err := client.Apply(context.Background(), ms, []ApplyOption{ExcludeTxnFromChangeStreams(), ApplyAtLeastOnce()}...)
5496+
if err != nil {
5497+
t.Fatal(err)
5498+
}
5499+
requests := drainRequestsFromServer(server.TestSpanner)
5500+
if err := compareRequests([]interface{}{
5501+
&sppb.BatchCreateSessionsRequest{},
5502+
&sppb.CommitRequest{}}, requests); err != nil {
5503+
t.Fatal(err)
5504+
}
5505+
if !requests[1].(*sppb.CommitRequest).Transaction.(*sppb.CommitRequest_SingleUseTransaction).SingleUseTransaction.ExcludeTxnFromChangeStreams {
5506+
t.Fatal("Transaction is not set to be excluded from change streams")
5507+
}
5508+
}
5509+
5510+
func TestClient_BatchWriteExcludeTxnFromChangeStreams(t *testing.T) {
5511+
server, client, teardown := setupMockedTestServer(t)
5512+
defer teardown()
5513+
5514+
mutationGroups := []*MutationGroup{
5515+
{[]*Mutation{
5516+
{opInsertOrUpdate, "t_test", nil, []string{"key", "val"}, []interface{}{"foo1", 1}},
5517+
}},
5518+
}
5519+
iter := client.BatchWriteWithOptions(context.Background(), mutationGroups, BatchWriteOptions{ExcludeTxnFromChangeStreams: true})
5520+
responseCount := 0
5521+
doFunc := func(r *sppb.BatchWriteResponse) error {
5522+
responseCount++
5523+
return nil
5524+
}
5525+
if err := iter.Do(doFunc); err != nil {
5526+
t.Fatal(err)
5527+
}
5528+
if responseCount != len(mutationGroups) {
5529+
t.Fatalf("Response count mismatch.\nGot: %v\nWant:%v", responseCount, len(mutationGroups))
5530+
}
5531+
requests := drainRequestsFromServer(server.TestSpanner)
5532+
if err := compareRequests([]interface{}{
5533+
&sppb.BatchCreateSessionsRequest{},
5534+
&sppb.BatchWriteRequest{}}, requests); err != nil {
5535+
t.Fatal(err)
5536+
}
5537+
if !requests[1].(*sppb.BatchWriteRequest).ExcludeTxnFromChangeStreams {
5538+
t.Fatal("Transaction is not set to be excluded from change streams")
5539+
}
5540+
}

spanner/pdml.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
8484
// Execute the PDML and retry if the transaction is aborted.
8585
executePdmlWithRetry := func(ctx context.Context) (int64, error) {
8686
for {
87-
count, err := executePdml(contextWithOutgoingMetadata(ctx, sh.getMetadata(), c.disableRouteToLeader), sh, req)
87+
count, err := executePdml(contextWithOutgoingMetadata(ctx, sh.getMetadata(), c.disableRouteToLeader), sh, req, options)
8888
if err == nil {
8989
return count, nil
9090
}
@@ -106,14 +106,15 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
106106
// 3. Execute the update statement on the PDML transaction
107107
//
108108
// Note that PDML transactions cannot be committed or rolled back.
109-
func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlRequest) (count int64, err error) {
109+
func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlRequest, options QueryOptions) (count int64, err error) {
110110
var md metadata.MD
111111
sh.updateLastUseTime()
112112
// Begin transaction.
113113
res, err := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{
114114
Session: sh.getID(),
115115
Options: &sppb.TransactionOptions{
116-
Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}},
116+
Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}},
117+
ExcludeTxnFromChangeStreams: options.ExcludeTxnFromChangeStreams,
117118
},
118119
})
119120
if err != nil {

spanner/pdml_test.go

+22
Original file line numberDiff line numberDiff line change
@@ -179,3 +179,25 @@ func TestPartitionedUpdate_Tagging(t *testing.T) {
179179
}
180180
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 1, sppb.RequestOptions{RequestTag: "pdml-tag"})
181181
}
182+
183+
func TestPartitionedUpdate_ExcludeTxnFromChangeStreams(t *testing.T) {
184+
ctx := context.Background()
185+
server, client, teardown := setupMockedTestServer(t)
186+
defer teardown()
187+
188+
_, err := client.PartitionedUpdateWithOptions(ctx, NewStatement(UpdateBarSetFoo), QueryOptions{ExcludeTxnFromChangeStreams: true})
189+
if err != nil {
190+
t.Fatalf("expect no errors, but got %v", err)
191+
}
192+
requests := drainRequestsFromServer(server.TestSpanner)
193+
if err := compareRequests([]interface{}{
194+
&sppb.BatchCreateSessionsRequest{},
195+
&sppb.BeginTransactionRequest{},
196+
&sppb.ExecuteSqlRequest{}}, requests); err != nil {
197+
t.Fatal(err)
198+
}
199+
200+
if !requests[1].(*sppb.BeginTransactionRequest).GetOptions().GetExcludeTxnFromChangeStreams() {
201+
t.Fatal("Transaction is not set to be excluded from change streams")
202+
}
203+
}

0 commit comments

Comments
 (0)