-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Destination Snowflake add test to avoid duplicated staged data #9412
Changes from 19 commits
93d53ac
ede8d38
27be3fe
ca75033
72ab46f
aec3384
7efd5aa
6a773f9
fa18537
b0ba37b
fa31a0d
67e0bd6
ec0d1bd
23598ec
59d63e6
e2cb62c
632583a
2bc76ce
78f4d29
603e4ac
b816c5a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,12 +6,35 @@ | |
|
||
import static org.junit.jupiter.api.Assertions.assertFalse; | ||
import static org.junit.jupiter.api.Assertions.assertTrue; | ||
import static org.mockito.ArgumentMatchers.anyString; | ||
import static org.mockito.Mockito.*; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.google.common.collect.ImmutableMap; | ||
import io.airbyte.commons.io.IOs; | ||
import io.airbyte.commons.jackson.MoreMappers; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.db.jdbc.JdbcDatabase; | ||
import io.airbyte.integrations.base.AirbyteMessageConsumer; | ||
import io.airbyte.integrations.base.Destination; | ||
import io.airbyte.protocol.models.AirbyteMessage; | ||
import io.airbyte.protocol.models.AirbyteRecordMessage; | ||
import io.airbyte.protocol.models.CatalogHelpers; | ||
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; | ||
import io.airbyte.protocol.models.DestinationSyncMode; | ||
import io.airbyte.protocol.models.Field; | ||
import io.airbyte.protocol.models.JsonSchemaPrimitive; | ||
import org.junit.jupiter.api.DisplayName; | ||
import org.junit.jupiter.api.Test; | ||
|
||
import java.nio.file.Path; | ||
import java.sql.SQLException; | ||
import java.time.Instant; | ||
import java.util.List; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.IntStream; | ||
|
||
public class SnowflakeDestinationTest { | ||
|
||
private static final ObjectMapper mapper = MoreMappers.initMapper(); | ||
|
@@ -53,4 +76,54 @@ public void useInsertStrategyTest() { | |
assertFalse(SnowflakeDestination.isS3Copy(stubConfig)); | ||
} | ||
|
||
@Test | ||
public void testCleanupStageOnFailure() throws Exception { | ||
|
||
JdbcDatabase mockDb = mock(JdbcDatabase.class); | ||
SnowflakeStagingSqlOperations sqlOperations = mock(SnowflakeStagingSqlOperations.class); | ||
final var testMessages = generateTestMessages(); | ||
final JsonNode config = Jsons.deserialize(IOs.readFile(Path.of("secrets/insert_config.json"))); | ||
|
||
AirbyteMessageConsumer airbyteMessageConsumer = new SnowflakeInternalStagingConsumerFactory() | ||
.create(Destination::defaultOutputRecordCollector, mockDb, | ||
sqlOperations, new SnowflakeSQLNameTransformer(), config, getCatalog()); | ||
doThrow(SQLException.class).when(sqlOperations).copyIntoTmpTableFromStage(any(),anyString(),anyString(),anyString()); | ||
|
||
airbyteMessageConsumer.start(); | ||
for (AirbyteMessage m : testMessages) { | ||
airbyteMessageConsumer.accept(m); | ||
} | ||
|
||
try { | ||
airbyteMessageConsumer.close(); | ||
}catch (Exception e){ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this need to catch all exception, or can it only catch There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
really good question. I believe SQLException should be enough |
||
//do nothing cause it's expected behavior | ||
} | ||
|
||
verify(sqlOperations, times(1)).cleanUpStage(any(),anyString()); | ||
} | ||
|
||
private List<AirbyteMessage> generateTestMessages() { | ||
return IntStream.range(0, 3) | ||
.boxed() | ||
.map(i -> new AirbyteMessage() | ||
.withType(AirbyteMessage.Type.RECORD) | ||
.withRecord(new AirbyteRecordMessage() | ||
.withStream("test") | ||
.withNamespace("test_staging") | ||
.withEmittedAt(Instant.now().toEpochMilli()) | ||
.withData(Jsons.jsonNode(ImmutableMap.of("id", i, "name", "human " + i))))) | ||
.collect(Collectors.toList()); | ||
} | ||
|
||
ConfiguredAirbyteCatalog getCatalog() { | ||
return new ConfiguredAirbyteCatalog().withStreams(List.of( | ||
CatalogHelpers.createConfiguredAirbyteStream( | ||
"test", | ||
"test_staging", | ||
Field.of("id", JsonSchemaPrimitive.NUMBER), | ||
Field.of("name", JsonSchemaPrimitive.STRING)) | ||
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE))); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this use
assertThrows(...)
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replaced with assertThrows