Skip to content

Commit 79a3207

Browse files
committed
feat(spanner):add support for change streams transaction exclusion option
1 parent b952f41 commit 79a3207

File tree

5 files changed

+303
-20
lines changed

5 files changed

+303
-20
lines changed

spanner/client.go

+28-7
Original file line numberDiff line numberDiff line change
@@ -684,6 +684,10 @@ type applyOption struct {
684684
transactionTag string
685685
// priority is the RPC priority that is used for the commit operation.
686686
priority sppb.RequestOptions_Priority
687+
// If excludeTxnFromChangeStreams == true, mutations from this Client.Apply
688+
// will not be recorded in allowed tracking change treams with DDL option
689+
// allow_txn_exclusion=true.
690+
excludeTxnFromChangeStreams bool
687691
}
688692

689693
// An ApplyOption is an optional argument to Apply.
@@ -722,6 +726,13 @@ func Priority(priority sppb.RequestOptions_Priority) ApplyOption {
722726
}
723727
}
724728

729+
// ExcludeTxnFromChangeStreams returns an ApplyOptions that sets whether to exclude recording this commit operation from allowed tracking change streams.
730+
func ExcludeTxnFromChangeStreams() ApplyOption {
731+
return func(ao *applyOption) {
732+
ao.excludeTxnFromChangeStreams = true
733+
}
734+
}
735+
725736
// Apply applies a list of mutations atomically to the database.
726737
func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
727738
ao := &applyOption{}
@@ -740,10 +751,10 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption)
740751
if !ao.atLeastOnce {
741752
resp, err := c.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
742753
return t.BufferWrite(ms)
743-
}, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag})
754+
}, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag, ExcludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams})
744755
return resp.CommitTs, err
745756
}
746-
t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag, disableRouteToLeader: c.disableRouteToLeader}
757+
t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag, disableRouteToLeader: c.disableRouteToLeader, excludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams}
747758
return t.applyAtLeastOnce(ctx, ms...)
748759
}
749760

@@ -754,21 +765,30 @@ type BatchWriteOptions struct {
754765

755766
// The transaction tag to use for this request.
756767
TransactionTag string
768+
769+
// If excludeTxnFromChangeStreams == true, modifications from all transactions
770+
// in this batch write request will not be recorded in allowed tracking
771+
// change treams with DDL option allow_txn_exclusion=true.
772+
ExcludeTxnFromChangeStreams bool
757773
}
758774

759775
// merge combines two BatchWriteOptions such that the input parameter will have higher
760776
// order of precedence.
761777
func (bwo BatchWriteOptions) merge(opts BatchWriteOptions) BatchWriteOptions {
762778
merged := BatchWriteOptions{
763-
TransactionTag: bwo.TransactionTag,
764-
Priority: bwo.Priority,
779+
TransactionTag: bwo.TransactionTag,
780+
Priority: bwo.Priority,
781+
ExcludeTxnFromChangeStreams: bwo.ExcludeTxnFromChangeStreams,
765782
}
766783
if opts.TransactionTag != "" {
767784
merged.TransactionTag = opts.TransactionTag
768785
}
769786
if opts.Priority != sppb.RequestOptions_PRIORITY_UNSPECIFIED {
770787
merged.Priority = opts.Priority
771788
}
789+
if opts.ExcludeTxnFromChangeStreams {
790+
merged.ExcludeTxnFromChangeStreams = opts.ExcludeTxnFromChangeStreams
791+
}
772792
return merged
773793
}
774794

@@ -916,9 +936,10 @@ func (c *Client) BatchWriteWithOptions(ctx context.Context, mgs []*MutationGroup
916936
var md metadata.MD
917937
sh.updateLastUseTime()
918938
stream, rpcErr := sh.getClient().BatchWrite(contextWithOutgoingMetadata(ct, sh.getMetadata(), c.disableRouteToLeader), &sppb.BatchWriteRequest{
919-
Session: sh.getID(),
920-
MutationGroups: mgsPb,
921-
RequestOptions: createRequestOptions(opts.Priority, "", opts.TransactionTag),
939+
Session: sh.getID(),
940+
MutationGroups: mgsPb,
941+
RequestOptions: createRequestOptions(opts.Priority, "", opts.TransactionTag),
942+
ExcludeTxnFromChangeStreams: opts.ExcludeTxnFromChangeStreams,
922943
}, gax.WithGRPCOptions(grpc.Header(&md)))
923944

924945
if getGFELatencyMetricsFlag() && md != nil && c.ct != nil {

spanner/client_test.go

+200
Original file line numberDiff line numberDiff line change
@@ -5338,3 +5338,203 @@ func TestClient_NestedReadWriteTransactionWithTag_InnerBlindWrite(t *testing.T)
53385338
t.Fatalf("transaction tag mismatch\nGot: %s\nWant: %s", g, w)
53395339
}
53405340
}
5341+
5342+
func TestClient_ReadWriteTransactionWithExcludeTxnFromChangeStreams_ExecuteSqlRequest(t *testing.T) {
5343+
server, client, teardown := setupMockedTestServer(t)
5344+
defer teardown()
5345+
ctx := context.Background()
5346+
5347+
_, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
5348+
_, err := tx.Update(ctx, Statement{SQL: UpdateBarSetFoo})
5349+
if err != nil {
5350+
return err
5351+
}
5352+
return nil
5353+
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
5354+
if err != nil {
5355+
t.Fatalf("Failed to execute the transaction: %s", err)
5356+
}
5357+
requests := drainRequestsFromServer(server.TestSpanner)
5358+
if err := compareRequests([]interface{}{
5359+
&sppb.BatchCreateSessionsRequest{},
5360+
&sppb.ExecuteSqlRequest{},
5361+
&sppb.CommitRequest{}}, requests); err != nil {
5362+
t.Fatal(err)
5363+
}
5364+
if !requests[1].(*sppb.ExecuteSqlRequest).GetTransaction().GetBegin().GetExcludeTxnFromChangeStreams() {
5365+
t.Fatal("Transaction is not set to be excluded from change streams")
5366+
}
5367+
}
5368+
5369+
func TestClient_ReadWriteTransactionWithExcludeTxnFromChangeStreams_BufferWrite(t *testing.T) {
5370+
server, client, teardown := setupMockedTestServer(t)
5371+
defer teardown()
5372+
ctx := context.Background()
5373+
5374+
_, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
5375+
if err := tx.BufferWrite([]*Mutation{
5376+
Insert("foo", []string{"col1"}, []interface{}{"key1"}),
5377+
}); err != nil {
5378+
return err
5379+
}
5380+
return nil
5381+
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
5382+
if err != nil {
5383+
t.Fatalf("Failed to execute the transaction: %s", err)
5384+
}
5385+
requests := drainRequestsFromServer(server.TestSpanner)
5386+
if err := compareRequests([]interface{}{
5387+
&sppb.BatchCreateSessionsRequest{},
5388+
&sppb.BeginTransactionRequest{},
5389+
&sppb.CommitRequest{}}, requests); err != nil {
5390+
t.Fatal(err)
5391+
}
5392+
if !requests[1].(*sppb.BeginTransactionRequest).GetOptions().GetExcludeTxnFromChangeStreams() {
5393+
t.Fatal("Transaction is not set to be excluded from change streams")
5394+
}
5395+
}
5396+
5397+
func TestClient_ReadWriteTransactionWithExcludeTxnFromChangeStreams_BatchUpdate(t *testing.T) {
5398+
server, client, teardown := setupMockedTestServer(t)
5399+
defer teardown()
5400+
ctx := context.Background()
5401+
5402+
_, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
5403+
_, err := tx.BatchUpdate(ctx, []Statement{NewStatement(UpdateBarSetFoo)})
5404+
if err != nil {
5405+
return err
5406+
}
5407+
return nil
5408+
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
5409+
if err != nil {
5410+
t.Fatalf("Failed to execute the transaction: %s", err)
5411+
}
5412+
requests := drainRequestsFromServer(server.TestSpanner)
5413+
if err := compareRequests([]interface{}{
5414+
&sppb.BatchCreateSessionsRequest{},
5415+
&sppb.ExecuteBatchDmlRequest{},
5416+
&sppb.CommitRequest{}}, requests); err != nil {
5417+
t.Fatal(err)
5418+
}
5419+
if !requests[1].(*sppb.ExecuteBatchDmlRequest).GetTransaction().GetBegin().GetExcludeTxnFromChangeStreams() {
5420+
t.Fatal("Transaction is not set to be excluded from change streams")
5421+
}
5422+
}
5423+
5424+
func TestClient_RequestLevelDMLWithExcludeTxnFromChangeStreams_Failed(t *testing.T) {
5425+
_, client, teardown := setupMockedTestServer(t)
5426+
defer teardown()
5427+
ctx := context.Background()
5428+
5429+
// Test normal DML
5430+
_, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
5431+
_, err := tx.UpdateWithOptions(ctx, Statement{SQL: UpdateBarSetFoo}, QueryOptions{ExcludeTxnFromChangeStreams: true})
5432+
if err != nil {
5433+
return err
5434+
}
5435+
return nil
5436+
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
5437+
if err == nil {
5438+
t.Fatalf("Missing expected exception")
5439+
}
5440+
msg := "cannot set exclude transaction from change streams for a request-level DML statement."
5441+
if status.Code(err) != codes.InvalidArgument || !strings.Contains(err.Error(), msg) {
5442+
t.Fatalf("error mismatch\nGot: %v\nWant: %v", err, msg)
5443+
}
5444+
5445+
// Test batch DML
5446+
_, err = client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
5447+
_, err := tx.UpdateWithOptions(ctx, Statement{SQL: UpdateBarSetFoo}, QueryOptions{ExcludeTxnFromChangeStreams: true})
5448+
if err != nil {
5449+
return err
5450+
}
5451+
return nil
5452+
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
5453+
if err == nil {
5454+
t.Fatalf("Missing expected exception")
5455+
}
5456+
if status.Code(err) != codes.InvalidArgument || !strings.Contains(err.Error(), msg) {
5457+
t.Fatalf("error mismatch\nGot: %v\nWant: %v", err, msg)
5458+
}
5459+
}
5460+
5461+
func TestClient_ApplyExcludeTxnFromChangeStreams(t *testing.T) {
5462+
server, client, teardown := setupMockedTestServer(t)
5463+
defer teardown()
5464+
5465+
ms := []*Mutation{
5466+
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
5467+
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
5468+
}
5469+
5470+
_, err := client.Apply(context.Background(), ms, ExcludeTxnFromChangeStreams())
5471+
if err != nil {
5472+
t.Fatal(err)
5473+
}
5474+
requests := drainRequestsFromServer(server.TestSpanner)
5475+
if err := compareRequests([]interface{}{
5476+
&sppb.BatchCreateSessionsRequest{},
5477+
&sppb.BeginTransactionRequest{},
5478+
&sppb.CommitRequest{}}, requests); err != nil {
5479+
t.Fatal(err)
5480+
}
5481+
if !requests[1].(*sppb.BeginTransactionRequest).GetOptions().GetExcludeTxnFromChangeStreams() {
5482+
t.Fatal("Transaction is not set to be excluded from change streams")
5483+
}
5484+
}
5485+
5486+
func TestClient_ApplyAtLeastOnceExcludeTxnFromChangeStreams(t *testing.T) {
5487+
server, client, teardown := setupMockedTestServer(t)
5488+
defer teardown()
5489+
5490+
ms := []*Mutation{
5491+
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
5492+
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
5493+
}
5494+
5495+
_, err := client.Apply(context.Background(), ms, []ApplyOption{ExcludeTxnFromChangeStreams(), ApplyAtLeastOnce()}...)
5496+
if err != nil {
5497+
t.Fatal(err)
5498+
}
5499+
requests := drainRequestsFromServer(server.TestSpanner)
5500+
if err := compareRequests([]interface{}{
5501+
&sppb.BatchCreateSessionsRequest{},
5502+
&sppb.CommitRequest{}}, requests); err != nil {
5503+
t.Fatal(err)
5504+
}
5505+
if !requests[1].(*sppb.CommitRequest).GetTransaction().(*sppb.CommitRequest_SingleUseTransaction).SingleUseTransaction.ExcludeTxnFromChangeStreams {
5506+
t.Fatal("Transaction is not set to be excluded from change streams")
5507+
}
5508+
}
5509+
5510+
func TestClient_BatchWriteExcludeTxnFromChangeStreams(t *testing.T) {
5511+
server, client, teardown := setupMockedTestServer(t)
5512+
defer teardown()
5513+
5514+
mutationGroups := []*MutationGroup{
5515+
{[]*Mutation{
5516+
{opInsertOrUpdate, "t_test", nil, []string{"key", "val"}, []interface{}{"foo1", 1}},
5517+
}},
5518+
}
5519+
iter := client.BatchWriteWithOptions(context.Background(), mutationGroups, BatchWriteOptions{ExcludeTxnFromChangeStreams: true})
5520+
responseCount := 0
5521+
doFunc := func(r *sppb.BatchWriteResponse) error {
5522+
responseCount++
5523+
return nil
5524+
}
5525+
if err := iter.Do(doFunc); err != nil {
5526+
t.Fatal(err)
5527+
}
5528+
if responseCount != len(mutationGroups) {
5529+
t.Fatalf("Response count mismatch.\nGot: %v\nWant:%v", responseCount, len(mutationGroups))
5530+
}
5531+
requests := drainRequestsFromServer(server.TestSpanner)
5532+
if err := compareRequests([]interface{}{
5533+
&sppb.BatchCreateSessionsRequest{},
5534+
&sppb.BatchWriteRequest{}}, requests); err != nil {
5535+
t.Fatal(err)
5536+
}
5537+
if !requests[1].(*sppb.BatchWriteRequest).GetExcludeTxnFromChangeStreams() {
5538+
t.Fatal("Transaction is not set to be excluded from change streams")
5539+
}
5540+
}

spanner/pdml.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
8484
// Execute the PDML and retry if the transaction is aborted.
8585
executePdmlWithRetry := func(ctx context.Context) (int64, error) {
8686
for {
87-
count, err := executePdml(contextWithOutgoingMetadata(ctx, sh.getMetadata(), c.disableRouteToLeader), sh, req)
87+
count, err := executePdml(contextWithOutgoingMetadata(ctx, sh.getMetadata(), c.disableRouteToLeader), sh, req, options)
8888
if err == nil {
8989
return count, nil
9090
}
@@ -106,14 +106,15 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
106106
// 3. Execute the update statement on the PDML transaction
107107
//
108108
// Note that PDML transactions cannot be committed or rolled back.
109-
func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlRequest) (count int64, err error) {
109+
func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlRequest, options QueryOptions) (count int64, err error) {
110110
var md metadata.MD
111111
sh.updateLastUseTime()
112-
// Begin transaction.
112+
// Begin transaction
113113
res, err := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{
114114
Session: sh.getID(),
115115
Options: &sppb.TransactionOptions{
116-
Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}},
116+
Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}},
117+
ExcludeTxnFromChangeStreams: options.ExcludeTxnFromChangeStreams,
117118
},
118119
})
119120
if err != nil {

spanner/pdml_test.go

+22
Original file line numberDiff line numberDiff line change
@@ -179,3 +179,25 @@ func TestPartitionedUpdate_Tagging(t *testing.T) {
179179
}
180180
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 1, sppb.RequestOptions{RequestTag: "pdml-tag"})
181181
}
182+
183+
func TestPartitionedUpdate_ExcludeTxnFromChangeStreams(t *testing.T) {
184+
ctx := context.Background()
185+
server, client, teardown := setupMockedTestServer(t)
186+
defer teardown()
187+
188+
_, err := client.PartitionedUpdateWithOptions(ctx, NewStatement(UpdateBarSetFoo), QueryOptions{ExcludeTxnFromChangeStreams: true})
189+
if err != nil {
190+
t.Fatalf("expect no errors, but got %v", err)
191+
}
192+
requests := drainRequestsFromServer(server.TestSpanner)
193+
if err := compareRequests([]interface{}{
194+
&sppb.BatchCreateSessionsRequest{},
195+
&sppb.BeginTransactionRequest{},
196+
&sppb.ExecuteSqlRequest{}}, requests); err != nil {
197+
t.Fatal(err)
198+
}
199+
200+
if !requests[1].(*sppb.BeginTransactionRequest).GetOptions().GetExcludeTxnFromChangeStreams() {
201+
t.Fatal("Transaction is not set to be excluded from change streams")
202+
}
203+
}

0 commit comments

Comments
 (0)