diff --git a/internal/impl/snowflake/streaming/integration_test.go b/internal/impl/snowflake/streaming/integration_test.go index 5fb9bddc49..85999b5577 100644 --- a/internal/impl/snowflake/streaming/integration_test.go +++ b/internal/impl/snowflake/streaming/integration_test.go @@ -273,10 +273,11 @@ func TestIntegerCompat(t *testing.T) { ctx := context.Background() restClient, streamClient := setup(t) channelOpts := streaming.ChannelOptions{ - Name: t.Name(), - DatabaseName: envOr("SNOWFLAKE_DB", "BABY_DATABASE"), - SchemaName: "PUBLIC", - TableName: "TEST_INT_TABLE", + Name: t.Name(), + DatabaseName: envOr("SNOWFLAKE_DB", "BABY_DATABASE"), + SchemaName: "PUBLIC", + TableName: "TEST_INT_TABLE", + BuildParallelism: 1, } _, err := restClient.RunSQL(ctx, streaming.RunSQLRequest{ Database: channelOpts.DatabaseName, @@ -347,10 +348,11 @@ func TestTimestampCompat(t *testing.T) { ctx := context.Background() restClient, streamClient := setup(t) channelOpts := streaming.ChannelOptions{ - Name: t.Name(), - DatabaseName: envOr("SNOWFLAKE_DB", "BABY_DATABASE"), - SchemaName: "PUBLIC", - TableName: "TEST_TIMESTAMP_TABLE", + Name: t.Name(), + DatabaseName: envOr("SNOWFLAKE_DB", "BABY_DATABASE"), + SchemaName: "PUBLIC", + TableName: "TEST_TIMESTAMP_TABLE", + BuildParallelism: 1, } var columnDefs []string var columnNames []string diff --git a/internal/impl/snowflake/streaming/rest.go b/internal/impl/snowflake/streaming/rest.go index bca68015e8..5e43c2ae35 100644 --- a/internal/impl/snowflake/streaming/rest.go +++ b/internal/impl/snowflake/streaming/rest.go @@ -323,7 +323,7 @@ func NewRestClient(account, user, version, app string, privateKey *rsa.PrivateKe // this should only show up in development, not released binaries version = "99.0.0" } - userAgent := fmt.Sprintf("RedpandaConnect/%v", version) + userAgent := fmt.Sprintf("RedpandaConnect_SnowpipeStreamingSDK/%v", version) debugf(logger, "making snowflake HTTP requests using User-Agent: %s", userAgent) c = &SnowflakeRestClient{ account: account, diff --git a/internal/impl/snowflake/streaming/streaming.go b/internal/impl/snowflake/streaming/streaming.go index 5c68aa6740..619895634d 100644 --- a/internal/impl/snowflake/streaming/streaming.go +++ b/internal/impl/snowflake/streaming/streaming.go @@ -156,6 +156,9 @@ type encryptionInfo struct { // OpenChannel creates a new or reuses a channel to load data into a Snowflake table. func (c *SnowflakeServiceClient) OpenChannel(ctx context.Context, opts ChannelOptions) (*SnowflakeIngestionChannel, error) { + if opts.BuildParallelism <= 0 { + return nil, fmt.Errorf("invalid build parallelism: %d", opts.BuildParallelism) + } resp, err := c.client.openChannel(ctx, openChannelRequest{ RequestID: c.nextRequestID(), Role: c.options.Role,