Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Snowflake : fixed duplicate rows on retries #9141

Merged

Conversation

VitaliiMaltsev
Copy link
Contributor

@VitaliiMaltsev VitaliiMaltsev commented Dec 28, 2021

What

When creating a new sync, if the sync fails and it has to retry, all rows which have already been put on the stage will then have rows appended again to the stage. So there are duplicate rows.

How

The stage now have a folder in it, as mentioned here https://docs.snowflake.com/en/user-guide/data-load-local-file-system-stage.html:

Recommended reading order

  1. x.java

🚨 User Impact 🚨

There should be no visible impact on the user

Pre-merge Checklist

Expand the relevant checklist and delete the others.

New Connector

Community member or Airbyter

  • Community member? Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • docs/SUMMARY.md
    • docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
    • docs/integrations/README.md
    • airbyte-integrations/builds.md
  • PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • Credentials added to Github CI. Instructions.
  • /test connector=connectors/<name> command is passing.
  • New Connector version released on Dockerhub by running the /publish command described here
  • After the connector is published, connector added to connector index as described here
  • Seed specs have been re-generated by building the platform and committing the changes to the seed spec files, as described here

Updating a connector

Community member or Airbyter

  • Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • Changelog updated in docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
  • PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • Credentials added to Github CI. Instructions.
  • /test connector=connectors/<name> command is passing.
  • New Connector version released on Dockerhub by running the /publish command described here
  • After the new connector version is published, connector version bumped in the seed directory as described here
  • Seed specs have been re-generated by building the platform and committing the changes to the seed spec files, as described here

Connector Generator

  • Issue acceptance criteria met
  • PR name follows PR naming conventions
  • If adding a new generator, add it to the list of scaffold modules being tested
  • The generator test modules (all connectors with -scaffold in their name) have been updated with the latest scaffold by running ./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates then checking in your changes
  • Documentation which references the generator is updated as needed.

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.


vmaltsev seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

@github-actions github-actions bot added the area/connectors Connector related issues label Dec 28, 2021
@github-actions github-actions bot added the area/documentation Improvements or additions to documentation label Dec 28, 2021
@VitaliiMaltsev VitaliiMaltsev temporarily deployed to more-secrets December 28, 2021 10:05 Inactive
@VitaliiMaltsev
Copy link
Contributor Author

VitaliiMaltsev commented Dec 28, 2021

/test connector=connectors/destination-snowflake

🕑 connectors/destination-snowflake https://github.com/airbytehq/airbyte/actions/runs/1630097546
✅ connectors/destination-snowflake https://github.com/airbytehq/airbyte/actions/runs/1630097546
Python tests coverage:

	 ---------- coverage: platform linux, python 3.8.10-final-0 -----------
	 Name                                                              Stmts   Miss  Cover
	 -------------------------------------------------------------------------------------
	 main_dev_transform_catalog.py                                         3      3     0%
	 main_dev_transform_config.py                                          3      3     0%
	 normalization/__init__.py                                             4      0   100%
	 normalization/destination_type.py                                    13      0   100%
	 normalization/transform_catalog/__init__.py                           2      0   100%
	 normalization/transform_catalog/catalog_processor.py                143     77    46%
	 normalization/transform_catalog/destination_name_transformer.py     124      6    95%
	 normalization/transform_catalog/reserved_keywords.py                 13      0   100%
	 normalization/transform_catalog/stream_processor.py                 494    313    37%
	 normalization/transform_catalog/table_name_registry.py              174     34    80%
	 normalization/transform_catalog/transform.py                         45     26    42%
	 normalization/transform_catalog/utils.py                             33      7    79%
	 normalization/transform_config/__init__.py                            2      0   100%
	 normalization/transform_config/transform.py                         146     32    78%
	 -------------------------------------------------------------------------------------
	 TOTAL                                                              1199    501    58%

@@ -45,6 +46,7 @@
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingConsumerFactory.class);

private static final long MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4; // 256mb
private static final String currentSyncPath = UUID.randomUUID().toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, use java convention for constants. Uppercase with underscores.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, use java convention for constants. Uppercase with underscores.

renamed

path);
try {
sqlOperations.copyIntoTmpTableFromStage(database, path, srcTableName, schemaName);
}catch (Exception e){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like one space missed here before the catch, please reformat.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@alexandertsukanov
Copy link
Contributor

@VitaliiMaltsev Minor comments, otherwise LGTM.

@VitaliiMaltsev VitaliiMaltsev temporarily deployed to more-secrets December 28, 2021 13:23 Inactive
@sherifnada sherifnada requested a review from edgao January 5, 2022 08:24
@sherifnada
Copy link
Contributor

sherifnada commented Jan 5, 2022

@edgao do you mind reviewing this PR? feel free to reassign to liren if underwater

@sherifnada sherifnada removed their request for review January 5, 2022 08:24
Copy link
Contributor

@edgao edgao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add some tests for this behavior? (i.e. that we're creating different subdirectories per run, and that we're deleting the staging data on failure)

@@ -45,6 +46,7 @@
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingConsumerFactory.class);

private static final long MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4; // 256mb
private static final String CURRENT_SYNC_PATH = UUID.randomUUID().toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer for this to be an instance field (i.e. non-static). Then SnowflakeInternalStagingDestination#getConsumer would need to call new SnowflakeInternalStagingConsumerFactory().create(...)

generally I avoid non-constant static fields, since they can be confusing + difficult to test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CURRENT_SYNC_PATH field used in static methods recordWriterFunction and onCloseFunction. Non-static field can't be used in static methods

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think those methods could just be switched to non-static as well (they're only called from create, so that should be safe)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edgao switched to non-static

@VitaliiMaltsev VitaliiMaltsev temporarily deployed to more-secrets January 10, 2022 10:28 Inactive
@VitaliiMaltsev
Copy link
Contributor Author

VitaliiMaltsev commented Jan 10, 2022

could you add some tests for this behavior? (i.e. that we're creating different subdirectories per run, and that we're deleting the staging data on failure)

@edgao During the implementation of this solution, while the airbyte sync lasts, I manually canceled the queries in the Snowflake console exactly at the moment when the data from the stage is inserted into the tables, in order for this to lead to the retry and the next sync of the same connection in airbyte. I'm not sure how to make a test that reproduces the same behavior.
I mean, during the execution of the test, the query should be canceled at a certain moment, but we do not know when and don't have access to the query id to cancel it https://docs.snowflake.com/en/sql-reference/functions/system_cancel_query.html

@VitaliiMaltsev VitaliiMaltsev temporarily deployed to more-secrets January 10, 2022 11:29 Inactive
@sherifnada
Copy link
Contributor

@VitaliiMaltsev can you create a separate issue for testing this behavior and work on it after merging this PR? I agree with Edward that we should have a test but would love to release this fix asap as it's a critical issue

@edgao
Copy link
Contributor

edgao commented Jan 10, 2022

I think writing a unit test would be sufficient - if you pass in a mocked SnowflakeStagingSqlOperations and initialize it with doThrow(...).when(mockedSqlOps.copyIntoTmpTableFromStage(...)), then you can use verify(mockedSqlOps).cleanUpStage(...) to check that it handled the exception correctly

vmaltsev added 2 commits January 10, 2022 19:00
…ate-rows

# Conflicts:
#	airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeStagingSqlOperations.java
#	docs/integrations/destinations/snowflake.md
@VitaliiMaltsev VitaliiMaltsev temporarily deployed to more-secrets January 10, 2022 17:05 Inactive
@VitaliiMaltsev VitaliiMaltsev temporarily deployed to more-secrets January 10, 2022 17:09 Inactive
Copy link
Contributor

@edgao edgao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

saw that you created #9389 - :shipit:

@VitaliiMaltsev
Copy link
Contributor Author

VitaliiMaltsev commented Jan 10, 2022

/publish connector=connectors/destination-snowflake

🕑 connectors/destination-snowflake https://github.com/airbytehq/airbyte/actions/runs/1678741625
✅ connectors/destination-snowflake https://github.com/airbytehq/airbyte/actions/runs/1678741625

@@ -149,13 +151,13 @@ private static RecordWriter recordWriterFunction(final JdbcDatabase database,
final WriteConfig writeConfig = pairToWriteConfig.get(pair);
final String schemaName = writeConfig.getOutputSchemaName();
final String tableName = writeConfig.getOutputTableName();
final String stageName = namingResolver.getStageName(schemaName, tableName);
final String path = namingResolver.getStagingPath(schemaName, tableName, CURRENT_SYNC_PATH);
Copy link
Contributor

@ChristopheDuong ChristopheDuong Mar 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When there are multiple "flush" on the same stream during the same sync (large streams), are all batches/files written to the same path / same object in the stage area?

the stage is like a folder? or is it like a file? (I guess it acts like a folder where we stage multiple files?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

🐛 Destination Snowflake: duplicate rows on retries when using incremental staging
10 participants