diff --git a/internal/impl/snowflake/streaming/integration_test.go b/internal/impl/snowflake/streaming/integration_test.go index 5fb9bddc49..85999b5577 100644 --- a/internal/impl/snowflake/streaming/integration_test.go +++ b/internal/impl/snowflake/streaming/integration_test.go @@ -273,10 +273,11 @@ func TestIntegerCompat(t *testing.T) { ctx := context.Background() restClient, streamClient := setup(t) channelOpts := streaming.ChannelOptions{ - Name: t.Name(), - DatabaseName: envOr("SNOWFLAKE_DB", "BABY_DATABASE"), - SchemaName: "PUBLIC", - TableName: "TEST_INT_TABLE", + Name: t.Name(), + DatabaseName: envOr("SNOWFLAKE_DB", "BABY_DATABASE"), + SchemaName: "PUBLIC", + TableName: "TEST_INT_TABLE", + BuildParallelism: 1, } _, err := restClient.RunSQL(ctx, streaming.RunSQLRequest{ Database: channelOpts.DatabaseName, @@ -347,10 +348,11 @@ func TestTimestampCompat(t *testing.T) { ctx := context.Background() restClient, streamClient := setup(t) channelOpts := streaming.ChannelOptions{ - Name: t.Name(), - DatabaseName: envOr("SNOWFLAKE_DB", "BABY_DATABASE"), - SchemaName: "PUBLIC", - TableName: "TEST_TIMESTAMP_TABLE", + Name: t.Name(), + DatabaseName: envOr("SNOWFLAKE_DB", "BABY_DATABASE"), + SchemaName: "PUBLIC", + TableName: "TEST_TIMESTAMP_TABLE", + BuildParallelism: 1, } var columnDefs []string var columnNames []string diff --git a/internal/impl/snowflake/streaming/streaming.go b/internal/impl/snowflake/streaming/streaming.go index 5c68aa6740..619895634d 100644 --- a/internal/impl/snowflake/streaming/streaming.go +++ b/internal/impl/snowflake/streaming/streaming.go @@ -156,6 +156,9 @@ type encryptionInfo struct { // OpenChannel creates a new or reuses a channel to load data into a Snowflake table. func (c *SnowflakeServiceClient) OpenChannel(ctx context.Context, opts ChannelOptions) (*SnowflakeIngestionChannel, error) { + if opts.BuildParallelism <= 0 { + return nil, fmt.Errorf("invalid build parallelism: %d", opts.BuildParallelism) + } resp, err := c.client.openChannel(ctx, openChannelRequest{ RequestID: c.nextRequestID(), Role: c.options.Role,