From 5383439e6dd7f5aead72a3b22b91fafc526bfcc6 Mon Sep 17 00:00:00 2001 From: VitaliiMaltsev <39538064+VitaliiMaltsev@users.noreply.github.com> Date: Wed, 12 Jan 2022 19:03:09 +0200 Subject: [PATCH] Destination Snowflake add test to avoid duplicated staged data (#9412) * fix for jdk 17 * added unit test * refactoring * replace Exception with SQLException Co-authored-by: vmaltsev --- ...owflakeInternalStagingConsumerFactory.java | 4 +- .../snowflake/SnowflakeDestinationTest.java | 71 ++++++++++++++++++- 2 files changed, 72 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingConsumerFactory.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingConsumerFactory.java index 74f1e2358c1c8..4017ca7618794 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingConsumerFactory.java @@ -22,6 +22,8 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.DestinationSyncMode; + +import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -178,7 +180,7 @@ private OnCloseFunction onCloseFunction(final JdbcDatabase database, path); try { sqlOperations.copyIntoTmpTableFromStage(database, path, srcTableName, schemaName); - } catch (Exception e){ + } catch (SQLException e){ sqlOperations.cleanUpStage(database, path); LOGGER.info("Cleaning stage path {}", path); throw new RuntimeException("Failed to upload data from stage "+ path, e); diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java index 2143cdcd22521..021fa9ed88feb 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java @@ -4,14 +4,36 @@ package io.airbyte.integrations.destination.snowflake; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.io.IOs; import io.airbyte.commons.jackson.MoreMappers; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.Destination; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.DestinationSyncMode; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaPrimitive; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import java.nio.file.Path; +import java.sql.SQLException; +import java.time.Instant; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + public class SnowflakeDestinationTest { private static final ObjectMapper mapper = MoreMappers.initMapper(); @@ -53,4 +75,49 @@ public void useInsertStrategyTest() { assertFalse(SnowflakeDestination.isS3Copy(stubConfig)); } + @Test + public void testCleanupStageOnFailure() throws Exception { + + JdbcDatabase mockDb = mock(JdbcDatabase.class); + SnowflakeStagingSqlOperations sqlOperations = mock(SnowflakeStagingSqlOperations.class); + final var testMessages = generateTestMessages(); + final JsonNode config = Jsons.deserialize(IOs.readFile(Path.of("secrets/insert_config.json"))); + + AirbyteMessageConsumer airbyteMessageConsumer = new SnowflakeInternalStagingConsumerFactory() + .create(Destination::defaultOutputRecordCollector, mockDb, + sqlOperations, new SnowflakeSQLNameTransformer(), config, getCatalog()); + doThrow(SQLException.class).when(sqlOperations).copyIntoTmpTableFromStage(any(),anyString(),anyString(),anyString()); + + airbyteMessageConsumer.start(); + for (AirbyteMessage m : testMessages) { + airbyteMessageConsumer.accept(m); + } + assertThrows(RuntimeException.class, airbyteMessageConsumer::close); + + verify(sqlOperations, times(1)).cleanUpStage(any(),anyString()); + } + + private List generateTestMessages() { + return IntStream.range(0, 3) + .boxed() + .map(i -> new AirbyteMessage() + .withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage() + .withStream("test") + .withNamespace("test_staging") + .withEmittedAt(Instant.now().toEpochMilli()) + .withData(Jsons.jsonNode(ImmutableMap.of("id", i, "name", "human " + i))))) + .collect(Collectors.toList()); + } + + ConfiguredAirbyteCatalog getCatalog() { + return new ConfiguredAirbyteCatalog().withStreams(List.of( + CatalogHelpers.createConfiguredAirbyteStream( + "test", + "test_staging", + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING)) + .withDestinationSyncMode(DestinationSyncMode.OVERWRITE))); + } + }