Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snowflake: schema evolution #2982

Merged
merged 7 commits into from
Nov 7, 2024

Conversation

rockwotj
Copy link
Collaborator

@rockwotj rockwotj commented Nov 4, 2024

Support schema evolution for Redpanda Connect's new Snowflake output.

this is a mechanism that can be used to evolve the schema when extra
data is coming through
Will be needed for the column name we use for schema evolution
Enable schema evolution using a custom mapping function, to allow
customers to customize the evolution method. Schema evolution needs to
be a global pause so that we can reopen channels to get new schemas.

I'm not in love with how errors are being hijacked here but I have not
came up with a better idea in the short time working on.
@rockwotj rockwotj force-pushed the pikachu-is-evolving branch from f97e467 to f08d9f8 Compare November 5, 2024 01:50
Copy link
Collaborator

@mihaitodor mihaitodor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @rockwotj! LGTM, just left a few small nitpicks. Feel free to 🐑 🚀

import "fmt"

// SchemaMismatchError occurs when the user provided data has data that
// doesn't match the schema *and* the table can be evolved to accomidate
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: accomidate

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙏

func quoteColumnName(name string) string {
var quoted strings.Builder
// Default to assume we're just going to add quotes and there won't
// be any double quotes inside the string that need escaped.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: needs

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

Comment on lines 562 to 566
return fmt.Errorf("unable to compute new column type for %s: %w", col.ColumnName(), err)
}
v, err := out.AsBytes()
if err != nil {
return fmt.Errorf("unable to compute new column type for %s: %w", col.ColumnName(), err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't distinguish between these errors

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call.

Comment on lines 573 to 593
_, err = o.restClient.RunSQL(ctx, streaming.RunSQLRequest{
// This looks very scary and it *should*. This is prone to SQL injection attacks. The column name is
// quoted according to the rules in Snowflake's documentation. This is also why we need to
// validate the data type, so that you can't sneak an injection attack in there.
Statement: fmt.Sprintf(`ALTER TABLE IDENTIFIER(?)
ADD COLUMN IF NOT EXISTS %s %s
COMMENT 'column created by schema evolution from Redpanda Connect'`,
col.ColumnName(),
columnType,
),
// Currently we set of timeout of 30 seconds so that we don't have to handle async operations
// that need polling to wait until they finish (results are made async when execution is longer
// than 45 seconds).
Timeout: 30,
Database: o.db,
Schema: o.schema,
Role: o.role,
Bindings: map[string]streaming.BindingValue{
"1": {Type: "TEXT", Value: o.table},
},
})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have a wrapper func so we don't duplicate this code & comments in MigrateNotNullColumn()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - thanks it's much easier to read now :)

Good comments as always!
@rockwotj rockwotj merged commit 44d711a into redpanda-data:main Nov 7, 2024
3 checks passed
@rockwotj rockwotj deleted the pikachu-is-evolving branch January 29, 2025 17:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants