-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Destination Snowflake : fixed duplicate rows on retries #9141
Destination Snowflake : fixed duplicate rows on retries #9141
Conversation
vmaltsev seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
/test connector=connectors/destination-snowflake
|
@@ -45,6 +46,7 @@ | |||
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingConsumerFactory.class); | |||
|
|||
private static final long MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4; // 256mb | |||
private static final String currentSyncPath = UUID.randomUUID().toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, use java convention for constants. Uppercase with underscores.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, use java convention for constants. Uppercase with underscores.
renamed
path); | ||
try { | ||
sqlOperations.copyIntoTmpTableFromStage(database, path, srcTableName, schemaName); | ||
}catch (Exception e){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like one space missed here before the catch, please reformat.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@VitaliiMaltsev Minor comments, otherwise LGTM. |
@edgao do you mind reviewing this PR? feel free to reassign to liren if underwater |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you add some tests for this behavior? (i.e. that we're creating different subdirectories per run, and that we're deleting the staging data on failure)
...src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSQLNameTransformer.java
Outdated
Show resolved
Hide resolved
@@ -45,6 +46,7 @@ | |||
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingConsumerFactory.class); | |||
|
|||
private static final long MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4; // 256mb | |||
private static final String CURRENT_SYNC_PATH = UUID.randomUUID().toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer for this to be an instance field (i.e. non-static). Then SnowflakeInternalStagingDestination#getConsumer
would need to call new SnowflakeInternalStagingConsumerFactory().create(...)
generally I avoid non-constant static fields, since they can be confusing + difficult to test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CURRENT_SYNC_PATH field used in static methods recordWriterFunction and onCloseFunction. Non-static field can't be used in static methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think those methods could just be switched to non-static as well (they're only called from create
, so that should be safe)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@edgao switched to non-static
@edgao During the implementation of this solution, while the airbyte sync lasts, I manually canceled the queries in the Snowflake console exactly at the moment when the data from the stage is inserted into the tables, in order for this to lead to the retry and the next sync of the same connection in airbyte. I'm not sure how to make a test that reproduces the same behavior. |
…ate-rows # Conflicts: # docs/integrations/destinations/snowflake.md
@VitaliiMaltsev can you create a separate issue for testing this behavior and work on it after merging this PR? I agree with Edward that we should have a test but would love to release this fix asap as it's a critical issue |
I think writing a unit test would be sufficient - if you pass in a mocked |
…ate-rows # Conflicts: # airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeStagingSqlOperations.java # docs/integrations/destinations/snowflake.md
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
saw that you created #9389 -
/publish connector=connectors/destination-snowflake
|
@@ -149,13 +151,13 @@ private static RecordWriter recordWriterFunction(final JdbcDatabase database, | |||
final WriteConfig writeConfig = pairToWriteConfig.get(pair); | |||
final String schemaName = writeConfig.getOutputSchemaName(); | |||
final String tableName = writeConfig.getOutputTableName(); | |||
final String stageName = namingResolver.getStageName(schemaName, tableName); | |||
final String path = namingResolver.getStagingPath(schemaName, tableName, CURRENT_SYNC_PATH); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When there are multiple "flush" on the same stream during the same sync (large streams), are all batches/files written to the same path / same object in the stage area?
the stage is like a folder? or is it like a file? (I guess it acts like a folder where we stage multiple files?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The path is like a virtual folder. It is similar to S3:
https://docs.snowflake.com/en/user-guide/data-load-considerations-stage.html
We always use the temp filename as the file under the folder:
https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeStagingSqlOperations.java#L45
What
When creating a new sync, if the sync fails and it has to retry, all rows which have already been put on the stage will then have rows appended again to the stage. So there are duplicate rows.
How
The stage now have a folder in it, as mentioned here https://docs.snowflake.com/en/user-guide/data-load-local-file-system-stage.html:
Recommended reading order
x.java
🚨 User Impact 🚨
There should be no visible impact on the user
Pre-merge Checklist
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/SUMMARY.md
docs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampledocs/integrations/README.md
airbyte-integrations/builds.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing./publish
command described hereUpdating a connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing./publish
command described hereConnector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changes