diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java index 48f4d7b22f49d..96ede81218d09 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java @@ -1011,6 +1011,57 @@ public void testEntrypointEnvVar() throws Exception { assertFalse(entrypoint.isBlank()); } + /** + * Verify that destination doesn't fail if new fields arrive in the data after initial schema + * discovery and sync. + * + * @throws Exception + */ + @Test + public void testSyncNotFailsWithNewFields() throws Exception { + if (!implementsOverwrite()) { + LOGGER.info("Destination's spec.json does not support overwrite sync mode."); + return; + } + + final AirbyteCatalog catalog = + Jsons.deserialize(MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.catalogFile), AirbyteCatalog.class); + final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog); + + final List firstSyncMessages = MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines() + .map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList()); + final JsonNode config = getConfig(); + runSyncAndVerifyStateOutput(config, firstSyncMessages, configuredCatalog, false); + final var stream = catalog.getStreams().get(0); + + // Run second sync with new fields on the message + final List secondSyncMessagesWithNewFields = Lists.newArrayList( + new AirbyteMessage() + .withType(Type.RECORD) + .withRecord(new AirbyteRecordMessage() + .withStream(stream.getName()) + .withEmittedAt(Instant.now().toEpochMilli()) + .withData(Jsons.jsonNode(ImmutableMap.builder() + .put("id", 1) + .put("currency", "USD") + .put("date", "2020-03-31T00:00:00Z") + .put("newFieldString", "Value for new field") + .put("newFieldNumber", 3) + .put("HKD", 10.1) + .put("NZD", 700.1) + .build()))), + new AirbyteMessage() + .withType(Type.STATE) + .withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.of("checkpoint", 2))))); + + // Run sync and verify that all message were written without failing + runSyncAndVerifyStateOutput(config, secondSyncMessagesWithNewFields, configuredCatalog, false); + var destinationOutput = retrieveRecords(testEnv, stream.getName(), getDefaultSchema(config), stream.getJsonSchema()); + // Remove state message + secondSyncMessagesWithNewFields.removeIf(airbyteMessage -> airbyteMessage.getType().equals(Type.STATE)); + assertEquals(secondSyncMessagesWithNewFields.size(), destinationOutput.size()); + } + /** * Whether the destination should be tested against different namespaces. */