-
Notifications
You must be signed in to change notification settings - Fork 851
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
snowflake: schema evolution #2982
Conversation
this is a mechanism that can be used to evolve the schema when extra data is coming through
Will be needed for the column name we use for schema evolution
Enable schema evolution using a custom mapping function, to allow customers to customize the evolution method. Schema evolution needs to be a global pause so that we can reopen channels to get new schemas. I'm not in love with how errors are being hijacked here but I have not came up with a better idea in the short time working on.
eb6c7d8
to
2582de9
Compare
f97e467
to
f08d9f8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @rockwotj! LGTM, just left a few small nitpicks. Feel free to 🐑 🚀
import "fmt" | ||
|
||
// SchemaMismatchError occurs when the user provided data has data that | ||
// doesn't match the schema *and* the table can be evolved to accomidate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: accomidate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙏
func quoteColumnName(name string) string { | ||
var quoted strings.Builder | ||
// Default to assume we're just going to add quotes and there won't | ||
// be any double quotes inside the string that need escaped. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: needs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
return fmt.Errorf("unable to compute new column type for %s: %w", col.ColumnName(), err) | ||
} | ||
v, err := out.AsBytes() | ||
if err != nil { | ||
return fmt.Errorf("unable to compute new column type for %s: %w", col.ColumnName(), err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't distinguish between these errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call.
_, err = o.restClient.RunSQL(ctx, streaming.RunSQLRequest{ | ||
// This looks very scary and it *should*. This is prone to SQL injection attacks. The column name is | ||
// quoted according to the rules in Snowflake's documentation. This is also why we need to | ||
// validate the data type, so that you can't sneak an injection attack in there. | ||
Statement: fmt.Sprintf(`ALTER TABLE IDENTIFIER(?) | ||
ADD COLUMN IF NOT EXISTS %s %s | ||
COMMENT 'column created by schema evolution from Redpanda Connect'`, | ||
col.ColumnName(), | ||
columnType, | ||
), | ||
// Currently we set of timeout of 30 seconds so that we don't have to handle async operations | ||
// that need polling to wait until they finish (results are made async when execution is longer | ||
// than 45 seconds). | ||
Timeout: 30, | ||
Database: o.db, | ||
Schema: o.schema, | ||
Role: o.role, | ||
Bindings: map[string]streaming.BindingValue{ | ||
"1": {Type: "TEXT", Value: o.table}, | ||
}, | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we have a wrapper func so we don't duplicate this code & comments in MigrateNotNullColumn()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - thanks it's much easier to read now :)
Good comments as always!
Support schema evolution for Redpanda Connect's new Snowflake output.