Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(destinations): Stop writing resources when channel is closed #460

Merged
merged 4 commits into from
Dec 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion clients/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (c *DestinationClient) Migrate(ctx context.Context, tables []*schema.Table)

// Write writes rows as they are received from the channel to the destination plugin.
// resources is marshaled schema.Resource. We are not marshalling this inside the function
// because usually it is alreadun marshalled from the destination plugin.
// because usually it is already marshalled from the destination plugin.
func (c *DestinationClient) Write(ctx context.Context, source string, syncTime time.Time, resources <-chan []byte) (uint64, error) {
saveClient, err := c.pbClient.Write(ctx)
if err != nil {
Expand All @@ -261,6 +261,10 @@ func (c *DestinationClient) Write(ctx context.Context, source string, syncTime t
Source: source,
Timestamp: timestamppb.New(syncTime),
}); err != nil {
if err == io.EOF {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im not sure i understand what this solved. give in othercase this just returns an error and doesn't write to the channel while with break it actually sends CloseAndRecv.

Copy link
Member Author

@erezrokah erezrokah Dec 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Write works in stream mode (it's not a synchronous RPC), so if we want the correct response/error from the write request we need to call RecvMsg which is only done in CloseAndRecv (and only once actually).

So SendMsg will never return application level error as it doesn't wait for the message to be received by the server.

Technically I think we should call RecvMsg for each SendMsg per the docs:
> SendMsg does not wait until the message is received by the server. An untimely stream closure may result in lost messages. To ensure delivery, users should ensure the RPC completed successfully using RecvMsg.

We don't need ⬆️ as we don't have Bidi-streaming (I think), but still the response is received from CloseAndRecv, see https://grpc.io/docs/languages/go/generated-code/#client-streaming-methods-1

// don't send write request if the channel is closed
break
}
return 0, fmt.Errorf("failed to call Write.Send: %w", err)
}
}
Expand Down Expand Up @@ -292,6 +296,10 @@ func (c *DestinationClient) Write2(ctx context.Context, tables schema.Tables, so
if err := saveClient.Send(&pb.Write2_Request{
Resource: resource,
}); err != nil {
if err == io.EOF {
// don't send write request if the channel is closed
break
}
return fmt.Errorf("failed to call Write2.Send: %w", err)
}
}
Expand Down
53 changes: 53 additions & 0 deletions clients/destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package clients

import (
"context"
"encoding/json"
"os"
"path"
"strings"
"testing"
"time"

"github.com/cloudquery/plugin-sdk/schema"
"github.com/cloudquery/plugin-sdk/specs"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
)

var newDestinationClientTestCases = []specs.Source{
Expand Down Expand Up @@ -59,3 +64,51 @@ func TestDestinationClient(t *testing.T) {
})
}
}

func TestDestinationClientWriteReturnsCorrectError(t *testing.T) {
ctx := context.Background()
l := zerolog.New(zerolog.NewTestWriter(t)).Output(zerolog.ConsoleWriter{Out: os.Stderr}).Level(zerolog.DebugLevel)
dirName := t.TempDir()
c, err := NewDestinationClient(ctx, specs.RegistryGithub, "cloudquery/sqlite", "v1.0.11", WithDestinationLogger(l), WithDestinationDirectory(dirName))
if err != nil {
t.Fatal(err)
}

defer func() {
if err := c.Terminate(); err != nil {
t.Logf("failed to terminate destination client: %v", err)
}
}()
sqliteSpec := struct {
connectionString string
}{connectionString: path.Join(dirName, "test.sql")}
if err := c.Initialize(ctx, specs.Destination{Spec: sqliteSpec}); err != nil {
t.Fatal(err)
}

name, err := c.Name(ctx)
if err != nil {
t.Fatal("failed to get name", err)
}

columns := []schema.Column{{Name: "int", Type: schema.TypeInt}}
tables := schema.Tables{&schema.Table{Name: "test-1", Columns: columns}, &schema.Table{Name: "test-2", Columns: columns}}
resource1 := schema.Resource{Item: map[string]any{"int": 1}, Table: tables[0]}
destResource1, _ := json.Marshal(resource1.ToDestinationResource())
resource2 := schema.Resource{Item: map[string]any{"int": 1}, Table: tables[1]}
destResource2, _ := json.Marshal(resource2.ToDestinationResource())
resourcesChannel := make(chan []byte)
go func() {
defer close(resourcesChannel)
// we need to stream enough data to the server so it at least starts processing it and return the relevant error
for i := 1; i < 100000; i++ {
resourcesChannel <- destResource1
resourcesChannel <- destResource2
resourcesChannel <- destResource1
resourcesChannel <- destResource2
}
}()

err = c.Write2(ctx, tables, name, time.Now().UTC(), resourcesChannel)
require.ErrorContains(t, err, "context canceled")
}