Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Snowflake : fixed duplicate rows on retries #9141

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
93d53ac
fix for jdk 17
Dec 15, 2021
ede8d38
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 15, 2021
27be3fe
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 15, 2021
ca75033
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 16, 2021
72ab46f
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 16, 2021
aec3384
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 16, 2021
7efd5aa
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 20, 2021
6a773f9
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 21, 2021
fa18537
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 21, 2021
b0ba37b
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 22, 2021
fa31a0d
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 23, 2021
67e0bd6
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 24, 2021
04353b3
Destination Snowflake: duplicate rows on retries
Dec 28, 2021
4c8dcef
added changelog
Dec 28, 2021
ec0d1bd
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 28, 2021
3debdd9
fix checkstyle
Dec 28, 2021
23598ec
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 28, 2021
382c884
Merge branch 'master' into vmaltsev/8832-destination-snowflake-duplic…
Dec 28, 2021
59d63e6
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 30, 2021
5203709
replace concat with +
Jan 10, 2022
e2cb62c
Merge branch 'master' of github.com:airbytehq/airbyte
Jan 10, 2022
b5010a2
Merge branch 'master' into vmaltsev/8832-destination-snowflake-duplic…
Jan 10, 2022
632583a
Merge branch 'master' of github.com:airbytehq/airbyte
Jan 10, 2022
76b157f
Merge branch 'master' into vmaltsev/8832-destination-snowflake-duplic…
Jan 10, 2022
d53981e
replaced static fields and methods with non-static
Jan 10, 2022
b5d9c81
bump version
Jan 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba",
"name": "Snowflake",
"dockerRepository": "airbyte/destination-snowflake",
"dockerImageTag": "0.4.1",
"dockerImageTag": "0.4.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake",
"icon": "snowflake.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.4.1
dockerImageTag: 0.4.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
icon: snowflake.svg
- name: MariaDB ColumnStore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.4.1
LABEL io.airbyte.version=0.4.2
LABEL io.airbyte.name=airbyte/destination-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -45,8 +46,9 @@ public class SnowflakeInternalStagingConsumerFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingConsumerFactory.class);

private static final long MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4; // 256mb
private final String CURRENT_SYNC_PATH = UUID.randomUUID().toString();

public static AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecordCollector,
public AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
final SnowflakeStagingSqlOperations sqlOperations,
final SnowflakeSQLNameTransformer namingResolver,
Expand Down Expand Up @@ -130,7 +132,7 @@ private static AirbyteStreamNameNamespacePair toNameNamespacePair(final WriteCon
return new AirbyteStreamNameNamespacePair(config.getStreamName(), config.getNamespace());
}

private static RecordWriter recordWriterFunction(final JdbcDatabase database,
private RecordWriter recordWriterFunction(final JdbcDatabase database,
final SqlOperations snowflakeSqlOperations,
final List<WriteConfig> writeConfigs,
final ConfiguredAirbyteCatalog catalog,
Expand All @@ -149,13 +151,13 @@ private static RecordWriter recordWriterFunction(final JdbcDatabase database,
final WriteConfig writeConfig = pairToWriteConfig.get(pair);
final String schemaName = writeConfig.getOutputSchemaName();
final String tableName = writeConfig.getOutputTableName();
final String stageName = namingResolver.getStageName(schemaName, tableName);
final String path = namingResolver.getStagingPath(schemaName, tableName, CURRENT_SYNC_PATH);
Copy link
Contributor

@ChristopheDuong ChristopheDuong Mar 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When there are multiple "flush" on the same stream during the same sync (large streams), are all batches/files written to the same path / same object in the stage area?

the stage is like a folder? or is it like a file? (I guess it acts like a folder where we stage multiple files?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


snowflakeSqlOperations.insertRecords(database, records, schemaName, stageName);
snowflakeSqlOperations.insertRecords(database, records, schemaName, path);
};
}

private static OnCloseFunction onCloseFunction(final JdbcDatabase database,
private OnCloseFunction onCloseFunction(final JdbcDatabase database,
final SnowflakeStagingSqlOperations sqlOperations,
final List<WriteConfig> writeConfigs,
final SnowflakeSQLNameTransformer namingResolver) {
Expand All @@ -170,11 +172,18 @@ private static OnCloseFunction onCloseFunction(final JdbcDatabase database,
LOGGER.info("Finalizing stream {}. schema {}, tmp table {}, final table {}", writeConfig.getStreamName(), schemaName, srcTableName,
dstTableName);

final String stageName = namingResolver.getStageName(schemaName, dstTableName);
sqlOperations.copyIntoTmpTableFromStage(database, stageName, srcTableName, schemaName);
LOGGER.info("Uploading data from stage: stream {}. schema {}, tmp table {}, stage {}", writeConfig.getStreamName(), schemaName,
srcTableName,
stageName);
final String path = namingResolver.getStagingPath(schemaName, dstTableName, CURRENT_SYNC_PATH);
LOGGER.info("Uploading data from stage: stream {}. schema {}, tmp table {}, stage path {}", writeConfig.getStreamName(), schemaName,
srcTableName,
path);
try {
sqlOperations.copyIntoTmpTableFromStage(database, path, srcTableName, schemaName);
} catch (Exception e){
sqlOperations.cleanUpStage(database, path);
LOGGER.info("Cleaning stage path {}", path);
throw new RuntimeException("Failed to upload data from stage "+ path, e);
}

sqlOperations.createTableIfNotExists(database, schemaName, dstTableName);
switch (writeConfig.getSyncMode()) {
case OVERWRITE -> queryList.add(sqlOperations.truncateTableQuery(database, schemaName, dstTableName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public JsonNode toJdbcConfig(final JsonNode config) {
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
return SnowflakeInternalStagingConsumerFactory.create(outputRecordCollector, getDatabase(config),
return new SnowflakeInternalStagingConsumerFactory().create(outputRecordCollector, getDatabase(config),
new SnowflakeStagingSqlOperations(), new SnowflakeSQLNameTransformer(), config, catalog);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ public String getStageName(String schemaName, String outputTableName) {
return schemaName.concat(outputTableName).replaceAll("-", "_").toUpperCase();
}

public String getStagingPath(String schemaName, String tableName, String currentSyncPath) {
return (getStageName(schemaName,tableName)+"/staged/"+currentSyncPath).toUpperCase();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,12 @@ public String createTableQuery(final JdbcDatabase database, final String schemaN
schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA, JavaBaseConstants.COLUMN_NAME_EMITTED_AT);
}

public void cleanUpStage(JdbcDatabase database, String path) throws SQLException {
database.execute(String.format("REMOVE @%s;", path));
}

@Override
public boolean isSchemaExists(JdbcDatabase database, String outputSchema) throws Exception {
return database.query(SHOW_SCHEMAS).map(schemas -> schemas.get(NAME).asText()).anyMatch(outputSchema::equalsIgnoreCase);
}

}
1 change: 1 addition & 0 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ Finally, you need to add read/write permissions to your bucket with that email.

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :----- | :------ |
| 0.4.2 | 2022-01-10 | [#9141](https://github.com/airbytehq/airbyte/pull/9141) | Fixed duplicate rows on retries |
| 0.4.1 | 2021-01-06 | [#9311](https://github.com/airbytehq/airbyte/pull/9311) | Update сreating schema during check |
| 0.4.0 | 2021-12-27 | [#9063](https://github.com/airbytehq/airbyte/pull/9063) | Updated normalization to produce permanent tables |
| 0.3.24 | 2021-12-23 | [#8869](https://github.com/airbytehq/airbyte/pull/8869) | Changed staging approach to Byte-Buffered |
Expand Down