diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/comparator/AdvancedTestDataComparator.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/comparator/AdvancedTestDataComparator.java index 94432f4ddd402..6f983c6017b91 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/comparator/AdvancedTestDataComparator.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/comparator/AdvancedTestDataComparator.java @@ -26,6 +26,7 @@ public class AdvancedTestDataComparator implements TestDataComparator { public static final String AIRBYTE_DATE_FORMAT = "yyyy-MM-dd"; public static final String AIRBYTE_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss"; public static final String AIRBYTE_DATETIME_PARSED_FORMAT = "yyyy-MM-dd HH:mm:ss.S"; + public static final String AIRBYTE_DATETIME_PARSED_FORMAT_TZ = "yyyy-MM-dd HH:mm:ss XXX"; public static final String AIRBYTE_DATETIME_WITH_TZ_FORMAT = "yyyy-MM-dd'T'HH:mm:ssXXX"; @Override @@ -145,6 +146,10 @@ protected DateTimeFormatter getAirbyteDateTimeWithTzFormatter() { return DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_WITH_TZ_FORMAT); } + protected DateTimeFormatter getAirbyteDateTimeParsedWithTzFormatter() { + return DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_PARSED_FORMAT_TZ); + } + protected boolean isDateTimeWithTzValue(final String value) { return value.matches(".+[+-]\\d{2}:\\d{2}"); } diff --git a/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLTestDataComparator.java b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLTestDataComparator.java index b22c87193595b..2295f2e99e8f0 100644 --- a/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLTestDataComparator.java +++ b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLTestDataComparator.java @@ -6,30 +6,36 @@ import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; -import java.time.LocalDateTime; -import java.time.ZoneOffset; +import java.time.LocalDate; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MSSQLTestDataComparator extends AdvancedTestDataComparator { - public static final String ACTUAL_MSSQL_AIRBYTE_DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss.S"; + private static final Logger LOGGER = LoggerFactory.getLogger(MSSQLTestDataComparator.class); private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer(); @Override - protected boolean compareDateTimeValues(String airbyteMessageValue, String destinationValue) { - if (!isDateTimeValue(destinationValue)) { - destinationValue = LocalDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(ACTUAL_MSSQL_AIRBYTE_DATETIME_FORMAT)).toString(); - } - return super.compareDateTimeValues(airbyteMessageValue, destinationValue); - } + protected boolean compareDateTimeWithTzValues(final String airbyteMessageValue, + final String destinationValue) { + try { + final ZonedDateTime airbyteDate = ZonedDateTime.parse(airbyteMessageValue, + getAirbyteDateTimeWithTzFormatter()); - @Override - protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) { - LocalDateTime parsedDateTime = LocalDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(ACTUAL_MSSQL_AIRBYTE_DATETIME_FORMAT)); - return ZonedDateTime.of(parsedDateTime, ZoneOffset.UTC); + final ZonedDateTime destinationDate = ZonedDateTime.parse(destinationValue, + getAirbyteDateTimeParsedWithTzFormatter()); + return airbyteDate.equals(destinationDate); + } catch (DateTimeParseException e) { + LOGGER.warn( + "Fail to convert values to ZonedDateTime. Try to compare as text. Airbyte value({}), Destination value ({}). Exception: {}", + airbyteMessageValue, destinationValue, e); + return compareTextValues(airbyteMessageValue, destinationValue); + } } @Override @@ -45,4 +51,31 @@ protected List resolveIdentifier(final String identifier) { return result; } + @Override + protected boolean compareDateTimeValues(String expectedValue, String actualValue) { + final var destinationDate = parseLocalDateTime(actualValue); + final var expectedDate = LocalDate.parse(expectedValue, + DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_FORMAT)); + return expectedDate.equals(destinationDate); + } + + private LocalDate parseLocalDateTime(String dateTimeValue) { + if (dateTimeValue != null) { + return LocalDate.parse(dateTimeValue, + DateTimeFormatter.ofPattern(getFormat(dateTimeValue))); + } else { + return null; + } + } + + private String getFormat(String dateTimeValue) { + if (dateTimeValue.contains("T")) { + // MsSql stores array of objects as a jsobb type, i.e. array of string for all cases + return AIRBYTE_DATETIME_FORMAT; + } else { + // MsSql stores datetime as datetime type after normalization + return AIRBYTE_DATETIME_PARSED_FORMAT; + } + } + } diff --git a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySqlTestDataComparator.java b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySqlTestDataComparator.java index 4cc27f9777e96..a938966602c7f 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySqlTestDataComparator.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySqlTestDataComparator.java @@ -45,7 +45,6 @@ protected boolean compareDateTimeValues(String expectedValue, String actualValue return expectedDate.equals(destinationDate); } - private LocalDate parseLocalDateTime(String dateTimeValue) { if (dateTimeValue != null) { return LocalDate.parse(dateTimeValue, diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftTestDataComparator.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftTestDataComparator.java index ef4cf8f9cd285..6e296fb9f668a 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftTestDataComparator.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftTestDataComparator.java @@ -5,11 +5,11 @@ package io.airbyte.integrations.destination.redshift; import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; -import java.time.DateTimeException; -import java.time.LocalDateTime; +import java.time.LocalDate; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; @@ -24,34 +24,62 @@ public class RedshiftTestDataComparator extends AdvancedTestDataComparator { protected static final String REDSHIFT_DATETIME_WITH_TZ_FORMAT = "yyyy-MM-dd HH:mm:ssX"; @Override - protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) { - return ZonedDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(REDSHIFT_DATETIME_WITH_TZ_FORMAT)).withZoneSameInstant(ZoneOffset.UTC); + protected List resolveIdentifier(final String identifier) { + final List result = new ArrayList<>(); + final String resolved = namingResolver.getIdentifier(identifier); + result.add(identifier); + result.add(resolved); + if (!resolved.startsWith("\"")) { + result.add(resolved.toLowerCase()); + result.add(resolved.toUpperCase()); + } + return result; } @Override - protected boolean compareDateTimeValues(String airbyteMessageValue, String destinationValue) { + protected boolean compareDateTimeWithTzValues(final String airbyteMessageValue, + final String destinationValue) { try { - var format = DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_FORMAT); - LocalDateTime dateTime = LocalDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(REDSHIFT_DATETIME_WITH_TZ_FORMAT)); - return super.compareDateTimeValues(airbyteMessageValue, format.format(dateTime)); - } catch (DateTimeException e) { - LOGGER.warn("Fail to convert values to DateTime. Try to compare as text. Airbyte value({}), Destination value ({}). Exception: {}", + final ZonedDateTime airbyteDate = ZonedDateTime.parse(airbyteMessageValue, + getAirbyteDateTimeWithTzFormatter()).withZoneSameInstant( + ZoneOffset.UTC); + + final ZonedDateTime destinationDate = ZonedDateTime.parse(destinationValue, + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ssX")); + return airbyteDate.equals(destinationDate); + } catch (DateTimeParseException e) { + LOGGER.warn( + "Fail to convert values to ZonedDateTime. Try to compare as text. Airbyte value({}), Destination value ({}). Exception: {}", airbyteMessageValue, destinationValue, e); return compareTextValues(airbyteMessageValue, destinationValue); } } @Override - protected List resolveIdentifier(final String identifier) { - final List result = new ArrayList<>(); - final String resolved = namingResolver.getIdentifier(identifier); - result.add(identifier); - result.add(resolved); - if (!resolved.startsWith("\"")) { - result.add(resolved.toLowerCase()); - result.add(resolved.toUpperCase()); + protected boolean compareDateTimeValues(String expectedValue, String actualValue) { + var destinationDate = parseLocalDateTime(actualValue); + var expectedDate = LocalDate.parse(expectedValue, + DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_FORMAT)); + return expectedDate.equals(destinationDate); + } + + private LocalDate parseLocalDateTime(String dateTimeValue) { + if (dateTimeValue != null) { + return LocalDate.parse(dateTimeValue, + DateTimeFormatter.ofPattern(getFormat(dateTimeValue))); + } else { + return null; + } + } + + private String getFormat(String dateTimeValue) { + if (dateTimeValue.contains("T")) { + // MySql stores array of objects as a jsonb type, i.e. array of string for all cases + return AIRBYTE_DATETIME_FORMAT; + } else { + // MySql stores datetime as datetime type after normalization + return AIRBYTE_DATETIME_PARSED_FORMAT; } - return result; } } diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java index da32ab5dea0f3..74e300f3012de 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java @@ -284,7 +284,8 @@ protected void assertCdcSchemaQueryable(final JsonNode config, final JdbcDatabas // Azure SQL does not support USE clause final String sql = - isAzureSQL ? "SELECT * FROM cdc.change_tables" : "USE [" + config.get(JdbcUtils.DATABASE_KEY).asText() + "]; SELECT * FROM cdc.change_tables"; + isAzureSQL ? "SELECT * FROM cdc.change_tables" + : "USE [" + config.get(JdbcUtils.DATABASE_KEY).asText() + "]; SELECT * FROM cdc.change_tables"; final PreparedStatement ps = connection.prepareStatement(sql); LOGGER.info(String.format( "Checking user '%s' can query the cdc schema and that we have at least 1 cdc enabled table using the query: '%s'", diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java index 9ba92c4c01fba..b04e49bd536ab 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java @@ -316,7 +316,6 @@ void testCdcCheckOperations() throws Exception { assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED); } - @Test void testCdcCheckOperationsWithDot() throws Exception { // assertCdcEnabledInDb and validate escape with special character