From a688918c3230d84b0e231f9ca6517ef821b44264 Mon Sep 17 00:00:00 2001 From: Yurii Bidiuk Date: Thu, 7 Apr 2022 17:31:37 +0300 Subject: [PATCH] Revert "DAT: verify that a destination is able to write any ISO8601-compliant date string (#9816)" This reverts commit 2f850b98ac1ba11aef00974642dbffd03fbf8ce8. --- .../destination/DateTimeConverter.java | 74 ---- .../destination/DateTimeUtils.java | 320 ------------------ .../DestinationAcceptanceTest.java | 137 +------- .../src/main/resources/edge_case_catalog.json | 70 +--- .../src/main/resources/edge_case_messages.txt | 13 - .../src/main/resources/format_reference.txt | 13 - ...obStorageCsvDestinationAcceptanceTest.java | 14 - ...DenormalizedDestinationAcceptanceTest.java | 68 ---- ...ormalizedGcsDestinationAcceptanceTest.java | 75 ---- .../BigQueryDestinationAcceptanceTest.java | 42 --- .../DatabricksDestinationAcceptanceTest.java | 83 ----- .../gcs/GcsAvroDestinationAcceptanceTest.java | 79 ----- .../gcs/GcsCsvDestinationAcceptanceTest.java | 11 - .../GcsParquetDestinationAcceptanceTest.java | 77 ----- ...trictEncryptDestinationAcceptanceTest.java | 52 --- .../mssql/MSSQLDestinationAcceptanceTest.java | 48 --- .../MSSQLDestinationAcceptanceTestSSL.java | 48 --- .../SshMSSQLDestinationAcceptanceTest.java | 48 --- ...trictEncryptDestinationAcceptanceTest.java | 27 +- .../mysql/MySQLDestinationAcceptanceTest.java | 27 +- .../SshMySQLDestinationAcceptanceTest.java | 26 +- ...estinationStrictEncryptAcceptanceTest.java | 32 -- .../PostgresDestinationAcceptanceTest.java | 32 -- .../SshPostgresDestinationAcceptanceTest.java | 31 -- .../PubsubDestinationAcceptanceTest.java | 1 - ...RedshiftCopyDestinationAcceptanceTest.java | 29 -- .../s3/S3AvroDestinationAcceptanceTest.java | 78 ----- .../s3/S3CsvDestinationAcceptanceTest.java | 14 - .../S3ParquetDestinationAcceptanceTest.java | 77 ----- ...wflakeInsertDestinationAcceptanceTest.java | 63 ---- 30 files changed, 12 insertions(+), 1697 deletions(-) delete mode 100644 airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DateTimeConverter.java delete mode 100644 airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DateTimeUtils.java delete mode 100644 airbyte-integrations/bases/standard-destination-test/src/main/resources/format_reference.txt diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DateTimeConverter.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DateTimeConverter.java deleted file mode 100644 index bca59bc93a339..0000000000000 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DateTimeConverter.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.standardtest.destination; - -import com.fasterxml.jackson.databind.node.ObjectNode; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteMessage.Type; -import java.util.List; -import java.util.Map; - -public interface DateTimeConverter { - - /** - * Search dateTimeFieldNames inside data from @messages and converts to connector-specific date or - * date-time format - * - * @param messages list with AirbyteMessage - * @param dateTimeFieldNames map where key - path to the date/date-time field (e.g. - * /parentField/field), value - "date" or "date-time" depends on the format from catalog. - */ - default void convertDateTimeFields(List messages, Map dateTimeFieldNames) { - for (AirbyteMessage message : messages) { - if (message.getType() == Type.RECORD) { - convertDateTime((ObjectNode) message.getRecord().getData(), dateTimeFieldNames); - } - } - } - - /** - * Search dateTimeFieldNames inside @data and converts to connector-specific date or date-time - * format - * - * @param data from message record - * @param dateTimeFieldNames map where key - path to the date/date-time field (e.g. - * /parentField/field), value - "date" or "date-time" depends on the format from catalog. - */ - default void convertDateTime(ObjectNode data, Map dateTimeFieldNames) {} - - /** - * Override this method and return 'true' if destination connector requires conversion for - * date/date-time fields for testSyncWithNormalization() method. Then override convertDateTime(..) - * method to convert data to specific date-time format - * - * @return true - if destination connector requires conversion for date/date-time fields, false - in - * the other case. - */ - default boolean requiresDateTimeConversionForNormalizedSync() { - return false; - } - - /** - * Override this method and return 'true' if destination connector requires conversion for - * date/date-time fields for testSync() method. Then override convertDateTime(..) method to convert - * data to specific date-time format - * - * @return true - if destination connector requires conversion for date/date-time fields, false - in - * the other case. - */ - default boolean requiresDateTimeConversionForSync() { - return false; - } - - /** - * - * @param path path to field e.g /field1/nested_field - * @return true if the path consists of only one field (e.g /someField) - */ - default boolean isOneLevelPath(String path) { - return path.lastIndexOf("/") == 0; - } - -} diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DateTimeUtils.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DateTimeUtils.java deleted file mode 100644 index bcc3292bca482..0000000000000 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DateTimeUtils.java +++ /dev/null @@ -1,320 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.standardtest.destination; - -import com.google.common.annotations.VisibleForTesting; -import java.time.Instant; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; -import java.time.format.DateTimeParseException; -import java.time.temporal.ChronoField; -import java.time.temporal.ChronoUnit; -import java.util.function.Function; -import java.util.regex.Pattern; - -/** - * Used for operations on date/date-time strings to convert it to the specific connector's format - * Only for test purposes. - */ -public class DateTimeUtils { - - public static final String DATE_TIME_FORMAT_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'"; - - public static final String DATE_TIME = "date-time"; - public static final String DATE = "date"; - - public static final Pattern MILLISECONDS_PATTERN = Pattern.compile("\\.\\d*"); - - private static final DateTimeFormatter FORMATTER = - DateTimeFormatter.ofPattern( - "[yyyy][yy]['-']['/']['.'][' '][MMM][MM][M]['-']['/']['.'][' '][dd][d]" + - "[[' ']['T']HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][SS][' '][z][zzz][Z][O][x][XXX][XX][X]]]"); - - /** - * Parse the Json date-time logical type to long value of epoch microseconds. Only for test - * purposes! - * - * @return the number of microseconds from the unix epoch, 1 January 1970 00:00:00.000000 UTC. - */ - @VisibleForTesting - public static Long getEpochMicros(String jsonDateTime) { - return convertDateTime(jsonDateTime, instant -> ChronoUnit.MICROS.between(Instant.EPOCH, instant)); - } - - /** - * Parse the Json date logical type to int value of epoch day. - * - * @return the number of days from the unix epoch, 1 January 1970 (ISO calendar). - */ - @VisibleForTesting - public static Integer getEpochDay(String jsonDate) { - return convertDate(jsonDate, date -> (int) date.toEpochDay()); - } - - /** - * Parse the Json date-time type to bigquery-denormalized specific format. Only for test purposes! - * - * @param data "2021-01-03T01:01:01.544+01:00" - * @return converted data "2021-01-03T01:01:01.544000" - */ - @VisibleForTesting - public static String convertToBigqueryDenormalizedFormat(String data) { - Instant instant = null; - try { - ZonedDateTime zdt = ZonedDateTime.parse(data, FORMATTER); - instant = zdt.toLocalDateTime().toInstant(ZoneOffset.UTC); - } catch (DateTimeParseException e) { - try { - LocalDateTime dt = LocalDateTime.parse(data, FORMATTER); - instant = dt.toInstant(ZoneOffset.UTC); - } catch (DateTimeParseException ex) { - // no logging since it may generate too much noise - } - } - return instant == null ? null : toBigqueryDenormalizedDateFormat(instant); - } - - /** - * Parse the Json date-time type to snowflake specific format. Only for test purposes! - * - * @param jsonDateTime e.g. "2021-01-03T01:01:01.544+01:00" - * @return converted data e.g. "2021-01-03T00:01:02Z" - */ - @VisibleForTesting - public static String convertToSnowflakeFormat(String jsonDateTime) { - Instant instant = null; - try { - ZonedDateTime zdt = ZonedDateTime.parse(jsonDateTime, FORMATTER); - instant = zdt.toLocalDateTime().atZone(ZoneId.systemDefault()).toLocalDateTime() - .toInstant(ZoneOffset.of(zdt.getOffset().toString())); - return DateTimeFormatter.ofPattern(DATE_TIME_FORMAT_PATTERN).withZone(ZoneOffset.UTC).format(instant); - } catch (DateTimeParseException e) { - try { - LocalDateTime dt = LocalDateTime.parse(jsonDateTime, FORMATTER); - instant = dt.toInstant(ZoneOffset.ofHours(-8)); - return DateTimeFormatter.ofPattern(DATE_TIME_FORMAT_PATTERN).withZone(ZoneOffset.UTC).format(instant); - } catch (DateTimeParseException ex) { - // no logging since it may generate too much noise - } - } - return instant == null ? null : instant.toString(); - } - - /** - * Parse the Json date-time type to Redshift specific format. Only for test purposes! - * - * @param jsonDateTime e.g. "2021-01-03T01:01:01.544+01:00" - * @return converted data e.g. "2021-01-03 00:01:01.544000+00" - */ - @VisibleForTesting - public static String convertToRedshiftFormat(String jsonDateTime) { - return convertDateTime(jsonDateTime, DateTimeUtils::toRedshiftDateFormat); - } - - /** - * Parse the Json date-time type to postgres specific format. Only for test purposes! - * - * @param jsonDateTime e.g. "2021-01-03T01:01:01.544+01:00" - * @return converted data e.g. "2021-01-03T00:01:01.544Z" - */ - @VisibleForTesting - public static String convertToPostgresFormat(String jsonDateTime) { - return convertDateTime(jsonDateTime, Instant::toString); - } - - /** - * Parse the Json date-time type to databricks specific format. Only for test purposes! - * - * @param jsonDateTime e.g. "2021-01-03T01:01:01.544+01:00" - * @return converted data "{\"member0\":2021-01-03 00:01:01.544,\"member1\":null}" - */ - @VisibleForTesting - public static String convertToDatabricksFormat(String jsonDateTime) { - return convertDateTime(jsonDateTime, DateTimeUtils::toDatabricksDateFormat); - } - - /** - * Parse the Json date-time type to MSSQL specific format. Only for test purposes! - * - * @param jsonDateTime e.g. "2021-01-03T01:01:01.544+01:00" - * @return converted data "2021-01-03 00:01:01.544" - */ - @VisibleForTesting - public static String convertToMSSQLFormat(String jsonDateTime) { - return convertDateTime(jsonDateTime, DateTimeUtils::toMSSQLDateFormat); - } - - /** - * Parse the Json date type to date-time format with zero values for time. Only for test purposes! - * - * @param jsonDate e.g. "2021-01-01" - * @return converted data "2021-01-01T00:00:00Z" - */ - @VisibleForTesting - public static String convertToDateFormatWithZeroTime(String jsonDate) { - return convertDate(jsonDate, date -> DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'") - .format(date.atStartOfDay().atZone(ZoneId.systemDefault()))); - } - - /** - * Parse the Json date type to general ISO date format. Only for test purposes! - * - * @param jsonDate e.g. "2021-1-1" - * @return converted data "2021-01-01" - */ - @VisibleForTesting - public static String convertToDateFormat(String jsonDate) { - return convertDate(jsonDate, date -> DateTimeFormatter.ISO_LOCAL_DATE.format( - date.atStartOfDay().atZone(ZoneId.systemDefault()))); - } - - /** - * Verify if the value is date or date-time - * - * @param value any string value - * @return true - if value is date/date-time, false - if value has any other format - */ - public static boolean isDateTimeValue(String value) { - try { - ZonedDateTime.parse(value, FORMATTER); - return true; - } catch (DateTimeParseException ignored) { - try { - LocalDateTime.parse(value, FORMATTER); - return true; - } catch (DateTimeParseException exception) { - try { - LocalDate.parse(value, FORMATTER); - return true; - } catch (DateTimeParseException ex) { - return false; - } - } - } - } - - /** - * Parse the Json date type to Instant and applies function to convert instant to connector specific - * date string. Only for test purposes! - * - * @param jsonDateTime input date-time string - * @param dateTimeFormatter function to convert instant to specific date-time format string - * @param output type for date-time. Usually is String but for some cases could be a Long - * @return converted date-time to specific format - */ - @VisibleForTesting - private static T convertDateTime(String jsonDateTime, Function dateTimeFormatter) { - Instant instant = null; - try { - ZonedDateTime zdt = ZonedDateTime.parse(jsonDateTime, FORMATTER); - instant = zdt.toLocalDateTime().toInstant(ZoneOffset.of(zdt.getOffset().toString())); - } catch (DateTimeParseException e) { - try { - LocalDateTime dt = LocalDateTime.parse(jsonDateTime, FORMATTER); - instant = dt.toInstant(ZoneOffset.UTC); - } catch (DateTimeParseException ex) { - // no logging since it may generate too much noise - } - } - return instant == null ? null : dateTimeFormatter.apply(instant); - } - - /** - * Parse the Json date type to LocalDate and applies function to convert localDate to connector - * specific date string. Only for test purposes! - * - * @param jsonDate input date string - * @param dateFormatter function to convert LocalDate to specific date format string - * @param output type for date. Usually is String but for some cases could be Integer - * @return converted date-time to specific format - */ - @VisibleForTesting - private static T convertDate(String jsonDate, Function dateFormatter) { - T convertedDate = null; - try { - LocalDate date = LocalDate.parse(jsonDate, FORMATTER); - convertedDate = dateFormatter.apply(date); - } catch (DateTimeParseException e) { - // no logging since it may generate too much noise - } - return convertedDate; - } - - /** - * Formats instant to MSSQL date-time. Only for test purposes! - * - * @param instant input date-time - * @return string with date-time without 'T' separator and zero timezone ('Z') - */ - @VisibleForTesting - private static String toMSSQLDateFormat(Instant instant) { - if (instant.get(ChronoField.MILLI_OF_SECOND) == 0) { - return DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.S").withZone(ZoneOffset.UTC).format(instant); - } else { - return DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneOffset.UTC).format(instant); - } - } - - /** - * Formats instant to Redshift date-time. If instant has some milli of second, the output date-time - * string will contain it after seconds, in the other case millis will be omitted for output sting. - * Only for test purposes! - * - * @param instant input date-time - * @return string with date-time with zero timezone ('+00') and without 'T' separator - */ - @VisibleForTesting - private static String toRedshiftDateFormat(Instant instant) { - if (instant.get(ChronoField.MILLI_OF_SECOND) == 0) { - return DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss'+00'").withZone(ZoneOffset.UTC).format(instant); - } else { - return DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS'+00'").withZone(ZoneOffset.UTC).format(instant); - } - } - - /** - * Formats instant to bigquery-denormalized date-time. If instant has some milli of second, the - * output date-time string will contain it after seconds, in the other case millis will be omitted - * for output sting. Note: bigquery-denormalized represents millis by 6-digits, but the last 3 - * digits are always '0' (e.g. 12:43:21.333000) This is the reason of division to 1000000 and then - * multiplication to 1000 in this method Only for test purposes! - * - * @param instant input date-time - * @return string with date-time without time zone - */ - @VisibleForTesting - private static String toBigqueryDenormalizedDateFormat(Instant instant) { - if (instant.get(ChronoField.MILLI_OF_SECOND) == 0) { - return DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss").withZone(ZoneOffset.UTC).format(instant); - } else { - String formattedDateTime = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS").withZone(ZoneOffset.UTC).format(instant); - return MILLISECONDS_PATTERN.matcher(formattedDateTime).replaceAll("." + (instant.getNano() / 1000000) * 1000); - } - } - - /** - * Formats instant to Databricks date-time. Only for test purposes! - * - * @param instant input date-time - * @return wrapped string with date-time - */ - @VisibleForTesting - private static String toDatabricksDateFormat(Instant instant) { - if (instant.get(ChronoField.MILLI_OF_SECOND) == 0) { - return DateTimeFormatter.ofPattern( - "'{\"member0\":'yyyy-MM-dd HH:mm:ss',\"member1\":null}'").withZone(ZoneOffset.UTC) - .format(instant); - } else { - return DateTimeFormatter.ofPattern( - "'{\"member0\":'yyyy-MM-dd HH:mm:ss.SSS',\"member1\":null}'").withZone(ZoneOffset.UTC) - .format(instant); - } - } - -} diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java index 719b7213d82dc..891c550a79529 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java @@ -10,9 +10,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -64,7 +62,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -72,15 +69,10 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.Random; -import java.util.Spliterator; -import java.util.Spliterators; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import java.util.stream.IntStream; import java.util.stream.Stream; -import java.util.stream.StreamSupport; -import org.apache.commons.lang3.StringUtils; import org.joda.time.DateTime; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -94,7 +86,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class DestinationAcceptanceTest implements DateTimeConverter { +public abstract class DestinationAcceptanceTest { private static final Random RANDOM = new Random(); private static final String NORMALIZATION_VERSION = "dev"; @@ -111,8 +103,6 @@ public abstract class DestinationAcceptanceTest implements DateTimeConverter { private ProcessFactory processFactory; private WorkerConfigs workerConfigs; - protected Map dateTimeFieldNames = Collections.emptyMap(); - /** * Name of the docker image that the tests will run against. * @@ -379,11 +369,6 @@ public void testSync(final String messagesFilename, final String catalogFilename final JsonNode config = getConfig(); final String defaultSchema = getDefaultSchema(config); runSyncAndVerifyStateOutput(config, messages, configuredCatalog, false); - if (requiresDateTimeConversionForSync()) { - dateTimeFieldNames = getDateTimeFieldsFormat(catalog.getStreams()); - convertDateTimeFields(messages, dateTimeFieldNames); - deserializeNestedObjects(messages, null); - } retrieveRawRecordsAndAssertSameMessages(catalog, messages, defaultSchema); } @@ -583,11 +568,6 @@ public void testSyncWithNormalization(final String messagesFilename, final Strin final String defaultSchema = getDefaultSchema(config); final List actualMessages = retrieveNormalizedRecords(catalog, defaultSchema); - if (requiresDateTimeConversionForNormalizedSync()) { - dateTimeFieldNames = getDateTimeFieldsFormat(catalog.getStreams()); - convertDateTimeFields(messages, dateTimeFieldNames); - deserializeNestedObjects(messages, actualMessages); - } assertSameMessages(messages, actualMessages, true); } @@ -1129,27 +1109,18 @@ protected void retrieveRawRecordsAndAssertSameMessages(final AirbyteCatalog cata final List messages, final String defaultSchema) throws Exception { - final List actualMessages = retrieveRawRecords(catalog, defaultSchema); - - assertSameMessages(messages, actualMessages, false); - } - - protected List retrieveRawRecords(final AirbyteCatalog catalog, final String defaultSchema) - throws Exception { final List actualMessages = new ArrayList<>(); for (final AirbyteStream stream : catalog.getStreams()) { final String streamName = stream.getName(); final String schema = stream.getNamespace() != null ? stream.getNamespace() : defaultSchema; - final List msgList = retrieveRecords(testEnv, streamName, schema, - stream.getJsonSchema()) - .stream() - .map(data -> new AirbyteRecordMessage().withStream(streamName).withNamespace(schema) - .withData(data)) - .toList(); + final List msgList = retrieveRecords(testEnv, streamName, schema, stream.getJsonSchema()) + .stream() + .map(data -> new AirbyteRecordMessage().withStream(streamName).withNamespace(schema).withData(data)) + .collect(Collectors.toList()); actualMessages.addAll(msgList); } - return actualMessages; + assertSameMessages(messages, actualMessages, false); } // ignores emitted at. @@ -1162,15 +1133,11 @@ protected void assertSameMessages(final List expected, .peek(recordMessage -> recordMessage.setEmittedAt(null)) .map(recordMessage -> pruneAirbyteInternalFields ? safePrune(recordMessage) : recordMessage) .map(AirbyteRecordMessage::getData) - .peek(this::sortDataFields) - .sorted(Comparator.comparing(JsonNode::toString)) .collect(Collectors.toList()); final List actualProcessed = actual.stream() .map(recordMessage -> pruneAirbyteInternalFields ? safePrune(recordMessage) : recordMessage) .map(AirbyteRecordMessage::getData) - .peek(this::sortDataFields) - .sorted(Comparator.comparing(JsonNode::toString)) .collect(Collectors.toList()); assertSameData(expectedProcessed, actualProcessed); @@ -1203,27 +1170,13 @@ private void assertSameData(final List expected, final List } LOGGER.info("For {} Expected {} vs Actual {}", key, expectedValue, actualValue); assertTrue(actualData.has(key)); - assertSameValue(key, expectedValue, actualValue); + assertSameValue(expectedValue, actualValue); } } } - /** - * Method that will sort all fields by name and rewrite JsonNode in sorted order - * - * @param data - data node from AirbyteMessage - */ - protected void sortDataFields(JsonNode data) { - var sortedFields = StreamSupport.stream(Spliterators.spliteratorUnknownSize(data.fields(), - Spliterator.ORDERED), false) - .sorted(Entry.comparingByKey(Comparator.comparing(String::toLowerCase))).toList(); - ((ObjectNode) data).removeAll(); - IntStream.range(0, sortedFields.size()) - .forEach(i -> ((ObjectNode) data).set(sortedFields.get(i).getKey().toLowerCase(), sortedFields.get(i).getValue())); - } - // Allows subclasses to implement custom comparison asserts - protected void assertSameValue(final String key, final JsonNode expectedValue, final JsonNode actualValue) { + protected void assertSameValue(final JsonNode expectedValue, final JsonNode actualValue) { assertEquals(expectedValue, actualValue); } @@ -1425,40 +1378,6 @@ public void testStressPerformance() throws Exception { destination.notifyEndOfStream(); } - /** - * This method goes through stream schemas and collect field names which format is "date" or - * "date-time" - * - * @return map where key is a field name and value is "date" or "date-time" - */ - protected static Map getDateTimeFieldsFormat(final List streams) { - final Map fieldFormats = new HashMap<>(); - - streams.stream().map(AirbyteStream::getJsonSchema).forEach(streamSchema -> { - findDateTimeFields(streamSchema, fieldFormats, StringUtils.EMPTY); - }); - - return fieldFormats; - } - - private static void findDateTimeFields(JsonNode streamSchema, Map fieldFormats, String parent) { - final JsonNode fieldDefinitions = streamSchema.get("properties"); - final Iterator> iterator = fieldDefinitions.fields(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - if (entry.getValue().has("type") && entry.getValue().get("type").asText().equals("object") - && entry.getValue().has("properties")) { - findDateTimeFields(entry.getValue(), fieldFormats, parent + "/" + entry.getKey()); - } - if (entry.getValue().has("format")) { - String format = entry.getValue().get("format").asText(); - if (format.equalsIgnoreCase("date") || format.equalsIgnoreCase("date-time")) { - fieldFormats.put(parent + "/" + entry.getKey(), format); - } - } - } - } - private final static String LOREM_IPSUM = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Pellentesque malesuada lacinia aliquet. Nam feugiat mauris vel magna dignissim feugiat. Nam non dapibus sapien, ac mattis purus. Donec mollis libero erat, a rutrum ipsum pretium id. Pellentesque habitant morbi tristique senectus et netus et malesuada fames ac turpis egestas. Integer nec aliquam leo. Aliquam eu dictum augue, a ornare elit.\n" + "\n" @@ -1549,44 +1468,4 @@ private static List getRecordMessagesWithNewNamespace(final List return airbyteMessages; } - /** - * Converts serialized json blob for nested object to real json object. E.g. {"key": - * "{\"nestedObject\" : \"one\"}"} will be converted to {"key": {"nestedObject" : "one"}} This - * method goes through @messages and store names of nested object fields to the set. After that it - * goes through - * - * @actualMessages and deserialize jsonb string fields from set, to JsonNode - * - * @param messages from edge_case_messages.txt - * @param actualMessages fetched messages from destination which could contain serialized json - * objects - */ - protected void deserializeNestedObjects(List messages, List actualMessages) { - HashSet nestedFieldNames = new HashSet<>(); - for (AirbyteMessage message : messages) { - if (message.getType() == Type.RECORD) { - var iterator = message.getRecord().getData().fieldNames(); - while (iterator.hasNext()) { - var fieldName = iterator.next(); - if (message.getRecord().getData().get(fieldName).isContainerNode()) { - nestedFieldNames.add(fieldName); - } - } - } - } - if (actualMessages != null) { - for (AirbyteRecordMessage message : actualMessages) { - nestedFieldNames.stream().filter(name -> message.getData().has(name)).forEach(name -> { - String data = message.getData().get(name).asText(); - try { - ((ObjectNode) message.getData()).set(name, - new ObjectMapper().readTree(data)); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } - }); - } - } - } - } diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/resources/edge_case_catalog.json b/airbyte-integrations/bases/standard-destination-test/src/main/resources/edge_case_catalog.json index d7a177f2b8020..3f7e6b03f9103 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/resources/edge_case_catalog.json +++ b/airbyte-integrations/bases/standard-destination-test/src/main/resources/edge_case_catalog.json @@ -1,20 +1,5 @@ { "streams": [ - { - "name": "stream_with_nested_objects", - "json_schema": { - "properties": { - "nested_object": { - "type": "object", - "properties": { - "nested_field": { - "type": "string" - } - } - } - } - } - }, { "name": "streamWithCamelCase", "json_schema": { @@ -25,6 +10,7 @@ } } }, + { "name": "stream_with_underscores", "json_schema": { @@ -153,60 +139,6 @@ } } } - }, - { - "name": "stream_with_dates", - "json_schema": { - "properties": { - "date": { - "type": "string", - "format": "date" - }, - "datetime": { - "type": "string", - "format": "date-time" - } - } - } - }, - { - "name": "stream_with_nested_dates", - "json_schema": { - "properties": { - "nested_object": { - "type": "object", - "properties": { - "field": { - "type": "integer" - } - } - }, - "datetime_nested": { - "type": "string", - "format": "date-time" - } - } - } - }, - { - "name": "stream_with_same_names_different_datatype", - "json_schema": { - "properties": { - "object": { - "type": "object", - "properties": { - "created_at": { - "type": "string", - "format": "date-time" - } - } - }, - "created_at": { - "type": "string", - "format": "date" - } - } - } } ] } diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/resources/edge_case_messages.txt b/airbyte-integrations/bases/standard-destination-test/src/main/resources/edge_case_messages.txt index 8136238422aa4..5f2adf5a21b47 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/resources/edge_case_messages.txt +++ b/airbyte-integrations/bases/standard-destination-test/src/main/resources/edge_case_messages.txt @@ -15,17 +15,4 @@ {"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589450, "data": { "some_id" : 202, "next_field_name" : "next_field_name_2" }}} {"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589500, "data": { "some_id" : 203 }}} {"type": "RECORD", "record": {"stream": "stream_with_binary_data", "emitted_at": 1602637589500, "data": { "some_id" : 303, "binary_field_name":"dGVzdA==" }}} -{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "date" : "2021-01-01"}}} -{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "datetime" : "2021-01-01 01:01:01"}}} -{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "datetime" : "2021-1-1 01:01:01"}}} -{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "datetime" : "2021-01-01 01:01:01.22654"}}} -{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "datetime" : "2021-05-08T02:07:23.88888Z"}}} -{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "datetime" : "2021-01-02T01:01:01+01:00"}}} -{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "datetime" : "2021-01-03T01:01:01.544+01:00"}}} -{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "datetime" : "2021-01-05T01:01:01Z"}}} -{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "datetime" : "2021-01-06T01:01:01-01:00"}}} -{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "datetime" : "2021-01-08T01:01:01+0000"}}} -{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "datetime" : "2021-01-01T01:01:01+01"}}} -{"type": "RECORD", "record": {"stream": "stream_with_nested_dates", "emitted_at": 1602637589000, "data": { "nested_object": { "field" : 123}, "datetime_nested": "2021-01-01 01:01:01"}}} -{"type": "RECORD", "record": {"stream": "stream_with_same_names_different_datatype", "emitted_at": 1602637589000, "data": { "object": { "created_at" : "2021-01-01T01:01:01+01"}, "created_at": "2021-01-01"}}} {"type": "STATE", "state": { "data": {"start_date": "2020-09-02"}}} diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/resources/format_reference.txt b/airbyte-integrations/bases/standard-destination-test/src/main/resources/format_reference.txt deleted file mode 100644 index 06013da3372e2..0000000000000 --- a/airbyte-integrations/bases/standard-destination-test/src/main/resources/format_reference.txt +++ /dev/null @@ -1,13 +0,0 @@ -VALID FORMATS: - -2021-01-01 -2021-01-01 01:01:01 -2021-1-1 01:01:01 -2021-01-01 01:01:01.22654 -2018-05-08T02:07:23.888888Z -2021-01-02T01:01:01+01:00 -2021-01-03T01:01:01.544+01:00 -2021-01-05T01:01:01Z -2021-01-06T01:01:01-01:00 -2021-01-08T01:01:01+0000 -2021-01-01T01:01:01+01 \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageCsvDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageCsvDestinationAcceptanceTest.java index 254793d6ec0e4..904d55ae44dc0 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageCsvDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageCsvDestinationAcceptanceTest.java @@ -9,9 +9,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.JavaBaseConstants; -import io.airbyte.protocol.models.AirbyteCatalog; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteRecordMessage; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Reader; @@ -132,15 +129,4 @@ protected String getAllSyncedObjects(String streamName) { } } - @Override - protected void retrieveRawRecordsAndAssertSameMessages(AirbyteCatalog catalog, - List messages, - String defaultSchema) - throws Exception { - final List actualMessages = retrieveRawRecords(catalog, defaultSchema); - deserializeNestedObjects(messages, actualMessages); - - assertSameMessages(messages, actualMessages, false); - } - } diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationAcceptanceTest.java index 6b006ebd41f5c..6603f588f9378 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationAcceptanceTest.java @@ -4,8 +4,6 @@ package io.airbyte.integrations.destination.bigquery; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE_TIME; import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.JsonNode; @@ -33,11 +31,9 @@ import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.StandardNameTransformer; import io.airbyte.integrations.standardtest.destination.DataArgumentsProvider; -import io.airbyte.integrations.standardtest.destination.DateTimeUtils; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; @@ -47,11 +43,9 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.TimeZone; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -256,7 +250,6 @@ protected void setup(final TestDestinationEnv testEnv) throws Exception { tearDownBigQuery(); } })); - TimeZone.setDefault(TimeZone.getTimeZone("UTC")); } @Override @@ -326,68 +319,7 @@ public void testSyncNormalizedWithoutNormalization(final String messagesFilename final String defaultSchema = getDefaultSchema(config); final List actualMessages = retrieveNormalizedRecords(catalog, defaultSchema); - dateTimeFieldNames = getDateTimeFieldsFormat(catalog.getStreams()); - convertDateTimeFields(messages, dateTimeFieldNames); - deserializeNestedObjects(messages, actualMessages); assertSameMessages(messages, actualMessages, true); } - @Override - public boolean requiresDateTimeConversionForSync() { - return true; - } - - @Override - public void convertDateTime(ObjectNode data, Map dateTimeFieldNames) { - for (String path : dateTimeFieldNames.keySet()) { - if (!data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) { - var pathFields = new ArrayList<>(Arrays.asList(path.split("/"))); - pathFields.remove(0); // first element always empty string - // if pathFields.size() == 1 -> /field else /field/nestedField.. - var pathWithoutLastField = pathFields.size() == 1 ? "/" + pathFields.get(0) - : "/" + String.join("/", pathFields.subList(0, pathFields.size() - 1)); - switch (dateTimeFieldNames.get(path)) { - case DATE_TIME -> { - if (pathFields.size() == 1) { - data.put(pathFields.get(0).toLowerCase(), - DateTimeUtils.convertToBigqueryDenormalizedFormat( - (data.get(pathFields.get(0)).asText()))); - } else { - ((ObjectNode) data.at(pathWithoutLastField)).put(pathFields.get(pathFields.size() - 1), - DateTimeUtils.convertToBigqueryDenormalizedFormat(data.at(path).asText())); - } - } - case DATE -> { - if (pathFields.size() == 1) { - data.put(pathFields.get(0).toLowerCase(), - DateTimeUtils.convertToDateFormat(data.get(pathFields.get(0)).asText())); - } else { - ((ObjectNode) data.at(pathWithoutLastField)).put( - pathFields.get(pathFields.size() - 1), - DateTimeUtils.convertToDateFormat((data.at(path).asText()))); - } - } - } - } - } - } - - @Override - protected void deserializeNestedObjects(List messages, List actualMessages) { - for (AirbyteMessage message : messages) { - if (message.getType() == Type.RECORD) { - var iterator = message.getRecord().getData().fieldNames(); - if (iterator.hasNext()) { - var fieldName = iterator.next(); - if (message.getRecord().getData().get(fieldName).isContainerNode()) { - message.getRecord().getData().get(fieldName).fieldNames().forEachRemaining(f -> { - var data = message.getRecord().getData().get(fieldName).get(f).asText(); - ((ObjectNode) message.getRecord().getData()).put(fieldName, String.format("[FieldValue{attribute=PRIMITIVE, value=%s}]", data)); - }); - } - } - } - } - } - } diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationAcceptanceTest.java index 5a0b4fcb6ee47..d49bd7f4097dd 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationAcceptanceTest.java @@ -4,25 +4,12 @@ package io.airbyte.integrations.destination.bigquery; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE_TIME; - import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.string.Strings; -import io.airbyte.integrations.standardtest.destination.DateTimeUtils; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteMessage.Type; -import io.airbyte.protocol.models.AirbyteRecordMessage; import java.io.IOException; -import java.math.BigDecimal; import java.nio.file.Files; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; public class BigQueryDenormalizedGcsDestinationAcceptanceTest extends BigQueryDenormalizedDestinationAcceptanceTest { @@ -63,66 +50,4 @@ protected JsonNode createConfig() throws IOException { .build()); } - @Override - public boolean requiresDateTimeConversionForSync() { - return true; - } - - @Override - public void convertDateTime(ObjectNode data, Map dateTimeFieldNames) { - for (String path : dateTimeFieldNames.keySet()) { - if (!data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) { - var pathFields = new ArrayList<>(Arrays.asList(path.split("/"))); - pathFields.remove(0); // first element always empty string - // if pathFields.size() == 1 -> /field else /field/nestedField.. - var pathWithoutLastField = pathFields.size() == 1 ? "/" + pathFields.get(0) - : "/" + String.join("/", pathFields.subList(0, pathFields.size() - 1)); - switch (dateTimeFieldNames.get(path)) { - case DATE_TIME -> { - if (pathFields.size() == 1) { - var result = String.valueOf(new BigDecimal(DateTimeUtils.getEpochMicros(data.at(path).asText()) / 1000).divide(new BigDecimal(1000))); - result = !result.contains(".") ? result + ".0" : result; - data.put(pathFields.get(0).toLowerCase(), result); - - } else { - var result = String.valueOf(new BigDecimal(DateTimeUtils.getEpochMicros(data.at(path).asText()) / 1000).divide(new BigDecimal(1000))); - result = !result.contains(".") ? result + ".0" : result; - ((ObjectNode) data.at(pathWithoutLastField)).put(pathFields.get(pathFields.size() - 1).toLowerCase(), - result); - } - } - case DATE -> { - if (pathFields.size() == 1) { - data.put(pathFields.get(0).toLowerCase(), - DateTimeUtils.convertToDateFormat(data.get(pathFields.get(0)).asText())); - } else { - ((ObjectNode) data.at(pathWithoutLastField)).put( - pathFields.get(pathFields.size() - 1).toLowerCase(), - DateTimeUtils.convertToDateFormat((data.at(path).asText()))); - } - } - } - } - } - } - - @Override - protected void deserializeNestedObjects(List messages, List actualMessages) { - for (AirbyteMessage message : messages) { - if (message.getType() == Type.RECORD) { - var iterator = message.getRecord().getData().fieldNames(); - if (iterator.hasNext()) { - var fieldName = iterator.next(); - if (message.getRecord().getData().get(fieldName).isContainerNode()) { - message.getRecord().getData().get(fieldName).fieldNames().forEachRemaining(f -> { - var data = message.getRecord().getData().get(fieldName).get(f); - ((ObjectNode) message.getRecord().getData()).put(fieldName, - String.format("[FieldValue{attribute=PRIMITIVE, value=%s}]", data.asText())); - }); - } - } - } - } - } - } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java index 259b9456dee44..8060648547466 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java @@ -4,8 +4,6 @@ package io.airbyte.integrations.destination.bigquery; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE_TIME; import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.JsonNode; @@ -31,11 +29,9 @@ import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.StandardNameTransformer; -import io.airbyte.integrations.standardtest.destination.DateTimeUtils; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import java.io.ByteArrayInputStream; import java.io.IOException; -import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -46,9 +42,7 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; -import org.junit.jupiter.api.Assertions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -297,40 +291,4 @@ private static Job waitForQuery(final Job queryJob) { } } - @Override - public boolean requiresDateTimeConversionForNormalizedSync() { - return true; - } - - @Override - public void convertDateTime(ObjectNode data, Map dateTimeFieldNames) { - if (dateTimeFieldNames.keySet().isEmpty()) { - return; - } - for (String path : dateTimeFieldNames.keySet()) { - if (isOneLevelPath(path) && !data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) { - var key = path.replace("/", StringUtils.EMPTY); - switch (dateTimeFieldNames.get(path)) { - case DATE_TIME -> { - var result = String.valueOf(new BigDecimal(DateTimeUtils.getEpochMicros(data.at(path).asText())).divide(new BigDecimal(1000000))); - data.put(key.toLowerCase(), !result.contains(".") ? result + ".0" : result); - } - case DATE -> data.put(key.toLowerCase(), - DateTimeUtils.convertToDateFormat(data.at(path).asText())); - } - } - } - } - - @Override - protected void assertSameValue(String key, - JsonNode expectedValue, - JsonNode actualValue) { - if (DATE_TIME.equals(dateTimeFieldNames.getOrDefault(key, StringUtils.EMPTY))) { - Assertions.assertEquals(expectedValue.asLong() / 1000000, actualValue.asLong()); - } else { - super.assertSameValue(key, expectedValue, actualValue); - } - } - } diff --git a/airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationAcceptanceTest.java index 47f70786453b6..55f03862fa300 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationAcceptanceTest.java @@ -4,8 +4,6 @@ package io.airbyte.integrations.destination.databricks; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE_TIME; import static org.jooq.impl.DSL.asterisk; import static org.jooq.impl.DSL.field; @@ -27,23 +25,14 @@ import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater; import io.airbyte.integrations.destination.s3.util.AvroRecordHelper; -import io.airbyte.integrations.standardtest.destination.DateTimeUtils; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteMessage.Type; -import io.airbyte.protocol.models.AirbyteRecordMessage; import java.nio.file.Path; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Arrays; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import org.apache.commons.lang3.RandomStringUtils; -import org.apache.commons.lang3.StringUtils; import org.jooq.SQLDialect; -import org.junit.jupiter.api.Assertions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -160,76 +149,4 @@ private static Database getDatabase(final DatabricksDestinationConfig databricks SQLDialect.DEFAULT); } - @Override - public boolean requiresDateTimeConversionForSync() { - return true; - } - - @Override - public void convertDateTime(ObjectNode data, Map dateTimeFieldNames) { - for (String path : dateTimeFieldNames.keySet()) { - if (!data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) { - var pathFields = new ArrayList<>(Arrays.asList(path.split("/"))); - pathFields.remove(0); // first element always empty string - // if pathFields.size() == 1 -> /field else /field/nestedField.. - var pathWithoutLastField = pathFields.size() == 1 ? "/" + pathFields.get(0) - : "/" + String.join("/", pathFields.subList(0, pathFields.size() - 1)); - switch (dateTimeFieldNames.get(path)) { - case DATE_TIME -> { - if (pathFields.size() == 1) { - data.put(pathFields.get(0).toLowerCase(), DateTimeUtils.convertToDatabricksFormat(data.get(pathFields.get(0)).asText())); - } else { - ((ObjectNode) data.at(pathWithoutLastField)).put(pathFields.get(pathFields.size() - 1).toLowerCase(), - DateTimeUtils.convertToDatabricksFormat(data.at(path).asText())); - } - } - case DATE -> { - if (pathFields.size() == 1) { - data.put(pathFields.get(0).toLowerCase(), - String.format("{\"member0\":%s,\"member1\":null}", - DateTimeUtils.convertToDateFormat(data.get(pathFields.get(0)).asText()))); - } else { - ((ObjectNode) data.at(pathWithoutLastField)).put( - pathFields.get(pathFields.size() - 1).toLowerCase(), - String.format("{\"member0\":%s,\"member1\":null}", - DateTimeUtils.convertToDateFormat(data.at(path).asText()))); - } - } - } - } - } - } - - @Override - protected void assertSameValue(String key, - JsonNode expectedValue, - JsonNode actualValue) { - var format = dateTimeFieldNames.getOrDefault(key, StringUtils.EMPTY); - if (DATE_TIME.equals(format) || DATE.equals(format)) { - Assertions.assertEquals(expectedValue.asText(), expectedValue.asText()); - } else { - super.assertSameValue(key, expectedValue, actualValue); - } - } - - @Override - protected void deserializeNestedObjects(List messages, List actualMessages) { - for (AirbyteMessage message : messages) { - if (message.getType() == Type.RECORD) { - var iterator = message.getRecord().getData().fieldNames(); - while (iterator.hasNext()) { - var fieldName = iterator.next(); - if (message.getRecord().getData().get(fieldName).isContainerNode()) { - message.getRecord().getData().get(fieldName).fieldNames().forEachRemaining(f -> { - var data = message.getRecord().getData().get(fieldName).get(f); - var wrappedData = String.format("{\"%s\":%s,\"_airbyte_additional_properties\":null}", f, - data.asText()); - ((ObjectNode) message.getRecord().getData()).put(fieldName, wrappedData); - }); - } - } - } - } - } - } diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroDestinationAcceptanceTest.java index 290ba9a9c37b8..a62a4e7f0b2c0 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroDestinationAcceptanceTest.java @@ -4,36 +4,22 @@ package io.airbyte.integrations.destination.gcs; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE_TIME; - import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectSummary; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; -import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.avro.AvroConstants; import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater; import io.airbyte.integrations.destination.s3.util.AvroRecordHelper; -import io.airbyte.integrations.standardtest.destination.DateTimeUtils; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteMessage.Type; -import io.airbyte.protocol.models.AirbyteRecordMessage; -import java.util.ArrayList; -import java.util.Arrays; import java.util.LinkedList; import java.util.List; -import java.util.Map; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.SeekableByteArrayInput; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericDatumReader; -import org.apache.commons.lang3.StringUtils; public class GcsAvroDestinationAcceptanceTest extends GcsDestinationAcceptanceTest { @@ -79,69 +65,4 @@ protected List retrieveRecords(final TestDestinationEnv testEnv, return jsonRecords; } - @Override - public boolean requiresDateTimeConversionForSync() { - return true; - } - - @Override - public void convertDateTime(final ObjectNode data, final Map dateTimeFieldNames) { - for (final String path : dateTimeFieldNames.keySet()) { - if (!data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) { - final var pathFields = new ArrayList<>(Arrays.asList(path.split("/"))); - pathFields.remove(0); // first element always empty string - // if pathFields.size() == 1 -> /field else /field/nestedField.. - final var pathWithoutLastField = pathFields.size() == 1 ? "/" + pathFields.get(0) - : "/" + String.join("/", pathFields.subList(0, pathFields.size() - 1)); - switch (dateTimeFieldNames.get(path)) { - case DATE_TIME -> { - if (pathFields.size() == 1) { - data.put(pathFields.get(0).toLowerCase(), - (DateTimeUtils.getEpochMicros(data.get(pathFields.get(0)).asText()) / 1000) - * 1000); - } else { - ((ObjectNode) data.at(pathWithoutLastField)).put( - pathFields.get(pathFields.size() - 1), - (DateTimeUtils.getEpochMicros(data.at(path).asText()) / 1000) * 1000); - } - } - case DATE -> { - if (pathFields.size() == 1) { - data.put(pathFields.get(0).toLowerCase(), - DateTimeUtils.getEpochDay(data.get(pathFields.get(0)).asText())); - } else { - ((ObjectNode) data.at(pathWithoutLastField)).put( - pathFields.get(pathFields.size() - 1), - DateTimeUtils.getEpochDay((data.at(path).asText()))); - } - } - } - } - } - } - - @Override - protected void deserializeNestedObjects(final List messages, final List actualMessages) { - for (final AirbyteMessage message : messages) { - if (message.getType() == Type.RECORD) { - final var iterator = message.getRecord().getData().fieldNames(); - while (iterator.hasNext()) { - final var fieldName = iterator.next(); - if (message.getRecord().getData().get(fieldName).isContainerNode()) { - message.getRecord().getData().get(fieldName).fieldNames().forEachRemaining(f -> { - final var data = message.getRecord().getData().get(fieldName).get(f); - final var wrappedData = String.format("{\"%s\":%s,\"_airbyte_additional_properties\":null}", f, - dateTimeFieldNames.containsKey(f) || !data.isTextual() ? data.asText() : StringUtils.wrap(data.asText(), "\"")); - try { - ((ObjectNode) message.getRecord().getData()).set(fieldName, new ObjectMapper().readTree(wrappedData)); - } catch (final JsonProcessingException e) { - e.printStackTrace(); - } - }); - } - } - } - } - } - } diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsCsvDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsCsvDestinationAcceptanceTest.java index a527b716db40b..2dbc83a2a37ba 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsCsvDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsCsvDestinationAcceptanceTest.java @@ -112,15 +112,4 @@ protected List retrieveRecords(final TestDestinationEnv testEnv, return jsonRecords; } - @Override - protected void retrieveRawRecordsAndAssertSameMessages(final AirbyteCatalog catalog, - final List messages, - final String defaultSchema) - throws Exception { - final List actualMessages = retrieveRawRecords(catalog, defaultSchema); - deserializeNestedObjects(messages, actualMessages); - - assertSameMessages(messages, actualMessages, false); - } - } diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsParquetDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsParquetDestinationAcceptanceTest.java index 149790f5fd298..561de516aab11 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsParquetDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsParquetDestinationAcceptanceTest.java @@ -4,36 +4,23 @@ package io.airbyte.integrations.destination.gcs; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE_TIME; - import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectSummary; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; -import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.destination.gcs.parquet.GcsParquetWriter; import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.avro.AvroConstants; import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater; import io.airbyte.integrations.destination.s3.util.AvroRecordHelper; -import io.airbyte.integrations.standardtest.destination.DateTimeUtils; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteMessage.Type; -import io.airbyte.protocol.models.AirbyteRecordMessage; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Map; import org.apache.avro.generic.GenericData; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.hadoop.ParquetReader; @@ -85,68 +72,4 @@ protected List retrieveRecords(final TestDestinationEnv testEnv, return jsonRecords; } - @Override - public boolean requiresDateTimeConversionForSync() { - return true; - } - - @Override - public void convertDateTime(final ObjectNode data, final Map dateTimeFieldNames) { - for (final String path : dateTimeFieldNames.keySet()) { - if (!data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) { - final var pathFields = new ArrayList<>(Arrays.asList(path.split("/"))); - pathFields.remove(0); // first element always empty string - // if pathFields.size() == 1 -> /field else /field/nestedField.. - final var pathWithoutLastField = pathFields.size() == 1 ? "/" + pathFields.get(0) - : "/" + String.join("/", pathFields.subList(0, pathFields.size() - 1)); - switch (dateTimeFieldNames.get(path)) { - case DATE_TIME -> { - if (pathFields.size() == 1) { - data.put(pathFields.get(0).toLowerCase(), - (DateTimeUtils.getEpochMicros(data.get(pathFields.get(0)).asText()) / 1000) - * 1000); - } else { - ((ObjectNode) data.at(pathWithoutLastField)).put( - pathFields.get(pathFields.size() - 1), - (DateTimeUtils.getEpochMicros(data.at(path).asText()) / 1000) * 1000); - } - } - case DATE -> { - if (pathFields.size() == 1) { - data.put(pathFields.get(0).toLowerCase(), - DateTimeUtils.getEpochDay(data.get(pathFields.get(0)).asText())); - } else { - ((ObjectNode) data.at(pathWithoutLastField)).put(pathFields.get(pathFields.size() - 1), - DateTimeUtils.getEpochDay((data.at(path).asText()))); - } - } - } - } - } - } - - @Override - protected void deserializeNestedObjects(final List messages, final List actualMessages) { - for (final AirbyteMessage message : messages) { - if (message.getType() == Type.RECORD) { - final var iterator = message.getRecord().getData().fieldNames(); - while (iterator.hasNext()) { - final var fieldName = iterator.next(); - if (message.getRecord().getData().get(fieldName).isContainerNode()) { - message.getRecord().getData().get(fieldName).fieldNames().forEachRemaining(f -> { - final var data = message.getRecord().getData().get(fieldName).get(f); - final var wrappedData = String.format("{\"%s\":%s,\"_airbyte_additional_properties\":null}", f, - dateTimeFieldNames.containsKey(f) || !data.isTextual() ? data.asText() : StringUtils.wrap(data.asText(), "\"")); - try { - ((ObjectNode) message.getRecord().getData()).set(fieldName, new ObjectMapper().readTree(wrappedData)); - } catch (final JsonProcessingException e) { - e.printStackTrace(); - } - }); - } - } - } - } - } - } diff --git a/airbyte-integrations/connectors/destination-mssql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mssql_strict_encrypt/MssqlStrictEncryptDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mssql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mssql_strict_encrypt/MssqlStrictEncryptDestinationAcceptanceTest.java index f466d70b8b77c..a84c56bcd3cfc 100644 --- a/airbyte-integrations/connectors/destination-mssql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mssql_strict_encrypt/MssqlStrictEncryptDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mssql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mssql_strict_encrypt/MssqlStrictEncryptDestinationAcceptanceTest.java @@ -4,8 +4,6 @@ package io.airbyte.integrations.destination.mssql_strict_encrypt; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE_TIME; import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.JsonNode; @@ -20,20 +18,13 @@ import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.base.ssh.SshHelpers; import io.airbyte.integrations.destination.ExtendedNameTransformer; -import io.airbyte.integrations.standardtest.destination.DateTimeUtils; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import io.airbyte.protocol.models.ConnectorSpecification; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.Spliterator; -import java.util.Spliterators; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; -import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.testcontainers.containers.MSSQLServerContainer; @@ -188,47 +179,4 @@ void testSpec() throws Exception { assertEquals(expected, actual); } - @Override - public boolean requiresDateTimeConversionForNormalizedSync() { - return true; - } - - @Override - public void convertDateTime(ObjectNode data, Map dateTimeFieldNames) { - var fields = StreamSupport.stream(Spliterators.spliteratorUnknownSize(data.fields(), - Spliterator.ORDERED), false).toList(); - if (dateTimeFieldNames.keySet().isEmpty()) { - return; - } - for (String path : dateTimeFieldNames.keySet()) { - if (isOneLevelPath(path) && !data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) { - var key = path.replace("/", StringUtils.EMPTY); - switch (dateTimeFieldNames.get(path)) { - case DATE_TIME -> data.put(key.toLowerCase(), - DateTimeUtils.convertToMSSQLFormat(data.at(path).asText())); - case DATE -> data.put(key.toLowerCase(), - DateTimeUtils.convertToDateFormat(data.at(path).asText())); - } - } - } - } - - @Override - protected void assertSameValue(String key, - JsonNode expectedValue, - JsonNode actualValue) { - if (DateTimeUtils.isDateTimeValue(expectedValue.asText()) && DateTimeUtils.isDateTimeValue(actualValue.asText())) { - /* - * Omitted millis for assertion because MSSQL datetime values are rounded to increments of .000, - * .003, or .007 seconds - * https://docs.microsoft.com/en-us/sql/t-sql/data-types/datetime-transact-sql?view=sql-server-ver15 - * #rounding-of-datetime-fractional-second-precision - */ - Assertions.assertEquals(DateTimeUtils.MILLISECONDS_PATTERN.matcher(expectedValue.asText()).replaceAll(StringUtils.EMPTY), - DateTimeUtils.MILLISECONDS_PATTERN.matcher(actualValue.asText()).replaceAll(StringUtils.EMPTY)); - } else { - super.assertSameValue(key, expectedValue, actualValue); - } - } - } diff --git a/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTest.java index 9abf0d75d586d..b2da0d07a5517 100644 --- a/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTest.java @@ -4,9 +4,6 @@ package io.airbyte.integrations.destination.mssql; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE_TIME; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; @@ -17,16 +14,12 @@ import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.ExtendedNameTransformer; -import io.airbyte.integrations.standardtest.destination.DateTimeUtils; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.testcontainers.containers.MSSQLServerContainer; @@ -180,45 +173,4 @@ static void cleanUp() { db.close(); } - @Override - public boolean requiresDateTimeConversionForNormalizedSync() { - return true; - } - - @Override - public void convertDateTime(ObjectNode data, Map dateTimeFieldNames) { - if (dateTimeFieldNames.keySet().isEmpty()) { - return; - } - for (String path : dateTimeFieldNames.keySet()) { - if (isOneLevelPath(path) && !data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) { - var key = path.replace("/", StringUtils.EMPTY); - switch (dateTimeFieldNames.get(path)) { - case DATE_TIME -> data.put(key.toLowerCase(), - DateTimeUtils.convertToMSSQLFormat(data.at(path).asText())); - case DATE -> data.put(key.toLowerCase(), - DateTimeUtils.convertToDateFormat(data.at(path).asText())); - } - } - } - } - - @Override - protected void assertSameValue(String key, - JsonNode expectedValue, - JsonNode actualValue) { - if (DateTimeUtils.isDateTimeValue(expectedValue.asText()) && DateTimeUtils.isDateTimeValue(actualValue.asText())) { - /* - * Omitted millis for assertion because MSSQL datetime values are rounded to increments of .000, - * .003, or .007 seconds - * https://docs.microsoft.com/en-us/sql/t-sql/data-types/datetime-transact-sql?view=sql-server-ver15 - * #rounding-of-datetime-fractional-second-precision - */ - Assertions.assertEquals(DateTimeUtils.MILLISECONDS_PATTERN.matcher(expectedValue.asText()).replaceAll(StringUtils.EMPTY), - DateTimeUtils.MILLISECONDS_PATTERN.matcher(actualValue.asText()).replaceAll(StringUtils.EMPTY)); - } else { - super.assertSameValue(key, expectedValue, actualValue); - } - } - } diff --git a/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTestSSL.java b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTestSSL.java index c101afb7521f6..8717a8c7f6c7f 100644 --- a/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTestSSL.java +++ b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTestSSL.java @@ -4,9 +4,6 @@ package io.airbyte.integrations.destination.mssql; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE_TIME; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; @@ -17,16 +14,12 @@ import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.ExtendedNameTransformer; -import io.airbyte.integrations.standardtest.destination.DateTimeUtils; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.testcontainers.containers.MSSQLServerContainer; import org.testcontainers.utility.DockerImageName; @@ -190,45 +183,4 @@ static void cleanUp() { db.close(); } - @Override - public boolean requiresDateTimeConversionForNormalizedSync() { - return true; - } - - @Override - public void convertDateTime(ObjectNode data, Map dateTimeFieldNames) { - if (dateTimeFieldNames.keySet().isEmpty()) { - return; - } - for (String path : dateTimeFieldNames.keySet()) { - if (isOneLevelPath(path) && !data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) { - var key = path.replace("/", StringUtils.EMPTY); - switch (dateTimeFieldNames.get(path)) { - case DATE_TIME -> data.put(key.toLowerCase(), - DateTimeUtils.convertToMSSQLFormat(data.at(path).asText())); - case DATE -> data.put(key.toLowerCase(), - DateTimeUtils.convertToDateFormat(data.at(path).asText())); - } - } - } - } - - @Override - protected void assertSameValue(String key, - JsonNode expectedValue, - JsonNode actualValue) { - if (DateTimeUtils.isDateTimeValue(expectedValue.asText()) && DateTimeUtils.isDateTimeValue(actualValue.asText())) { - /* - * Omitted millis for assertion because MSSQL datetime values are rounded to increments of .000, - * .003, or .007 seconds - * https://docs.microsoft.com/en-us/sql/t-sql/data-types/datetime-transact-sql?view=sql-server-ver15 - * #rounding-of-datetime-fractional-second-precision - */ - Assertions.assertEquals(DateTimeUtils.MILLISECONDS_PATTERN.matcher(expectedValue.asText()).replaceAll(StringUtils.EMPTY), - DateTimeUtils.MILLISECONDS_PATTERN.matcher(actualValue.asText()).replaceAll(StringUtils.EMPTY)); - } else { - super.assertSameValue(key, expectedValue, actualValue); - } - } - } diff --git a/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/SshMSSQLDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/SshMSSQLDestinationAcceptanceTest.java index 2cfbfd3545534..e0a1cba4bb3de 100644 --- a/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/SshMSSQLDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/SshMSSQLDestinationAcceptanceTest.java @@ -4,9 +4,6 @@ package io.airbyte.integrations.destination.mssql; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE_TIME; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; @@ -19,16 +16,12 @@ import io.airbyte.integrations.base.ssh.SshBastionContainer; import io.airbyte.integrations.base.ssh.SshTunnel; import io.airbyte.integrations.destination.ExtendedNameTransformer; -import io.airbyte.integrations.standardtest.destination.DateTimeUtils; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; import org.apache.commons.lang3.RandomStringUtils; -import org.apache.commons.lang3.StringUtils; -import org.junit.jupiter.api.Assertions; import org.testcontainers.containers.JdbcDatabaseContainer; import org.testcontainers.containers.MSSQLServerContainer; import org.testcontainers.containers.Network; @@ -193,45 +186,4 @@ protected void tearDown(final TestDestinationEnv testEnv) { bastion.stopAndCloseContainers(db); } - @Override - public boolean requiresDateTimeConversionForNormalizedSync() { - return true; - } - - @Override - public void convertDateTime(ObjectNode data, Map dateTimeFieldNames) { - if (dateTimeFieldNames.keySet().isEmpty()) { - return; - } - for (String path : dateTimeFieldNames.keySet()) { - if (isOneLevelPath(path) && !data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) { - var key = path.replace("/", StringUtils.EMPTY); - switch (dateTimeFieldNames.get(path)) { - case DATE_TIME -> data.put(key.toLowerCase(), - DateTimeUtils.convertToMSSQLFormat(data.at(path).asText())); - case DATE -> data.put(key.toLowerCase(), - DateTimeUtils.convertToDateFormat(data.at(path).asText())); - } - } - } - } - - @Override - protected void assertSameValue(String key, - JsonNode expectedValue, - JsonNode actualValue) { - if (DateTimeUtils.isDateTimeValue(expectedValue.asText()) && DateTimeUtils.isDateTimeValue(actualValue.asText())) { - /* - * Omitted millis for assertion because MSSQL datetime values are rounded to increments of .000, - * .003, or .007 seconds - * https://docs.microsoft.com/en-us/sql/t-sql/data-types/datetime-transact-sql?view=sql-server-ver15 - * #rounding-of-datetime-fractional-second-precision - */ - Assertions.assertEquals(DateTimeUtils.MILLISECONDS_PATTERN.matcher(expectedValue.asText()).replaceAll(StringUtils.EMPTY), - DateTimeUtils.MILLISECONDS_PATTERN.matcher(actualValue.asText()).replaceAll(StringUtils.EMPTY)); - } else { - super.assertSameValue(key, expectedValue, actualValue); - } - } - } diff --git a/airbyte-integrations/connectors/destination-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLStrictEncryptDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLStrictEncryptDestinationAcceptanceTest.java index 69b64e0726d62..0c29d20a296b8 100644 --- a/airbyte-integrations/connectors/destination-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLStrictEncryptDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLStrictEncryptDestinationAcceptanceTest.java @@ -4,11 +4,9 @@ package io.airbyte.integrations.destination.mysql; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE; import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; @@ -16,7 +14,6 @@ import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.ExtendedNameTransformer; -import io.airbyte.integrations.standardtest.destination.DateTimeUtils; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; @@ -29,9 +26,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; import org.jooq.SQLDialect; import org.junit.jupiter.api.Test; import org.testcontainers.containers.MySQLContainer; @@ -242,27 +237,7 @@ public void testLineBreakCharacters() { // overrides test with a no-op until we handle full UTF-8 in the destination } - @Override - public boolean requiresDateTimeConversionForNormalizedSync() { - return true; - } - - @Override - public void convertDateTime(ObjectNode data, Map dateTimeFieldNames) { - if (dateTimeFieldNames.keySet().isEmpty()) { - return; - } - for (String path : dateTimeFieldNames.keySet()) { - if (isOneLevelPath(path) && !data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) { - var key = path.replace("/", StringUtils.EMPTY); - if (DATE.equals(dateTimeFieldNames.get(path))) { - data.put(key.toLowerCase(), DateTimeUtils.convertToDateFormat(data.at(path).asText())); - } - } - } - } - - protected void assertSameValue(final String key, final JsonNode expectedValue, final JsonNode actualValue) { + protected void assertSameValue(final JsonNode expectedValue, final JsonNode actualValue) { if (expectedValue.isBoolean()) { // Boolean in MySQL are stored as TINYINT (0 or 1) so we force them to boolean values here assertEquals(expectedValue.asBoolean(), actualValue.asBoolean()); diff --git a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLDestinationAcceptanceTest.java index cbf2922a2bbf0..6016ca02c3be0 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLDestinationAcceptanceTest.java @@ -4,11 +4,9 @@ package io.airbyte.integrations.destination.mysql; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE; import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; @@ -16,7 +14,6 @@ import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.ExtendedNameTransformer; -import io.airbyte.integrations.standardtest.destination.DateTimeUtils; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; @@ -29,9 +26,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; import org.jooq.SQLDialect; import org.junit.jupiter.api.Test; import org.testcontainers.containers.MySQLContainer; @@ -245,27 +240,7 @@ public void testLineBreakCharacters() { // overrides test with a no-op until we handle full UTF-8 in the destination } - @Override - public boolean requiresDateTimeConversionForNormalizedSync() { - return true; - } - - @Override - public void convertDateTime(ObjectNode data, Map dateTimeFieldNames) { - if (dateTimeFieldNames.keySet().isEmpty()) { - return; - } - for (String path : dateTimeFieldNames.keySet()) { - if (isOneLevelPath(path) && !data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) { - var key = path.replace("/", StringUtils.EMPTY); - if (DATE.equals(dateTimeFieldNames.get(path))) { - data.put(key.toLowerCase(), DateTimeUtils.convertToDateFormat(data.at(path).asText())); - } - } - } - } - - protected void assertSameValue(final String key, final JsonNode expectedValue, final JsonNode actualValue) { + protected void assertSameValue(final JsonNode expectedValue, final JsonNode actualValue) { if (expectedValue.isBoolean()) { // Boolean in MySQL are stored as TINYINT (0 or 1) so we force them to boolean values here assertEquals(expectedValue.asBoolean(), actualValue.asBoolean()); diff --git a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SshMySQLDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SshMySQLDestinationAcceptanceTest.java index b9ede115c7374..168e017eac94d 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SshMySQLDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SshMySQLDestinationAcceptanceTest.java @@ -4,7 +4,6 @@ package io.airbyte.integrations.destination.mysql; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE; import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.JsonNode; @@ -18,15 +17,12 @@ import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.base.ssh.SshTunnel; import io.airbyte.integrations.destination.ExtendedNameTransformer; -import io.airbyte.integrations.standardtest.destination.DateTimeUtils; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import org.apache.commons.lang3.RandomStringUtils; -import org.apache.commons.lang3.StringUtils; /** * Abstract class that allows us to avoid duplicating testing logic for testing SSH with a key file @@ -164,27 +160,7 @@ protected void tearDown(final TestDestinationEnv testEnv) throws Exception { }); } - @Override - public boolean requiresDateTimeConversionForNormalizedSync() { - return true; - } - - @Override - public void convertDateTime(ObjectNode data, Map dateTimeFieldNames) { - if (dateTimeFieldNames.keySet().isEmpty()) { - return; - } - for (String path : dateTimeFieldNames.keySet()) { - if (isOneLevelPath(path) && !data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) { - var key = path.replace("/", StringUtils.EMPTY); - if (DATE.equals(dateTimeFieldNames.get(path))) { - data.put(key.toLowerCase(), DateTimeUtils.convertToDateFormat(data.at(path).asText())); - } - } - } - } - - protected void assertSameValue(final String key, final JsonNode expectedValue, final JsonNode actualValue) { + protected void assertSameValue(final JsonNode expectedValue, final JsonNode actualValue) { if (expectedValue.isBoolean()) { // Boolean in MySQL are stored as TINYINT (0 or 1) so we force them to boolean values here assertEquals(expectedValue.asBoolean(), actualValue.asBoolean()); diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationStrictEncryptAcceptanceTest.java b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationStrictEncryptAcceptanceTest.java index 095dd47d487e5..ce71fbf7ec97c 100644 --- a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationStrictEncryptAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationStrictEncryptAcceptanceTest.java @@ -4,26 +4,18 @@ package io.airbyte.integrations.destination.postgres; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE_TIME; - import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.db.Databases; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.ExtendedNameTransformer; -import io.airbyte.integrations.standardtest.destination.DateTimeUtils; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.TimeZone; import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.DockerImageName; @@ -132,7 +124,6 @@ protected void setup(final TestDestinationEnv testEnv) { db = new PostgreSQLContainer<>(DockerImageName.parse("marcosmarxm/postgres-ssl:dev").asCompatibleSubstituteFor("postgres")) .withCommand("postgres -c ssl=on -c ssl_cert_file=/var/lib/postgresql/server.crt -c ssl_key_file=/var/lib/postgresql/server.key"); db.start(); - TimeZone.setDefault(TimeZone.getTimeZone("UTC")); } @Override @@ -141,27 +132,4 @@ protected void tearDown(final TestDestinationEnv testEnv) { db.close(); } - @Override - public boolean requiresDateTimeConversionForNormalizedSync() { - return true; - } - - @Override - public void convertDateTime(ObjectNode data, Map dateTimeFieldNames) { - if (dateTimeFieldNames.keySet().isEmpty()) { - return; - } - for (String path : dateTimeFieldNames.keySet()) { - if (isOneLevelPath(path) && !data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) { - var key = path.replace("/", StringUtils.EMPTY); - switch (dateTimeFieldNames.get(path)) { - case DATE_TIME -> data.put(key.toLowerCase(), - DateTimeUtils.convertToPostgresFormat(data.at(path).asText())); - case DATE -> data.put(key.toLowerCase(), - DateTimeUtils.convertToDateFormat(data.at(path).asText())); - } - } - } - } - } diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationAcceptanceTest.java index 348855389397c..17463b521acc2 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationAcceptanceTest.java @@ -4,26 +4,18 @@ package io.airbyte.integrations.destination.postgres; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE_TIME; - import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.db.Databases; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.ExtendedNameTransformer; -import io.airbyte.integrations.standardtest.destination.DateTimeUtils; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.TimeZone; import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; import org.testcontainers.containers.PostgreSQLContainer; public class PostgresDestinationAcceptanceTest extends DestinationAcceptanceTest { @@ -130,7 +122,6 @@ private List retrieveRecordsFromTable(final String tableName, final St protected void setup(final TestDestinationEnv testEnv) { db = new PostgreSQLContainer<>("postgres:13-alpine"); db.start(); - TimeZone.setDefault(TimeZone.getTimeZone("UTC")); } @Override @@ -139,27 +130,4 @@ protected void tearDown(final TestDestinationEnv testEnv) { db.close(); } - @Override - public boolean requiresDateTimeConversionForNormalizedSync() { - return true; - } - - @Override - public void convertDateTime(ObjectNode data, Map dateTimeFieldNames) { - if (dateTimeFieldNames.keySet().isEmpty()) { - return; - } - for (String path : dateTimeFieldNames.keySet()) { - if (isOneLevelPath(path) && !data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) { - var key = path.replace("/", StringUtils.EMPTY); - switch (dateTimeFieldNames.get(path)) { - case DATE_TIME -> data.put(key.toLowerCase(), - DateTimeUtils.convertToPostgresFormat(data.at(path).asText())); - case DATE -> data.put(key.toLowerCase(), - DateTimeUtils.convertToDateFormat(data.at(path).asText())); - } - } - } - } - } diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshPostgresDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshPostgresDestinationAcceptanceTest.java index ff2e11116e673..4ee8d0ed0f79c 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshPostgresDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshPostgresDestinationAcceptanceTest.java @@ -4,9 +4,6 @@ package io.airbyte.integrations.destination.postgres; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE_TIME; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.commons.functional.CheckedFunction; @@ -18,15 +15,11 @@ import io.airbyte.integrations.base.ssh.SshBastionContainer; import io.airbyte.integrations.base.ssh.SshTunnel; import io.airbyte.integrations.destination.ExtendedNameTransformer; -import io.airbyte.integrations.standardtest.destination.DateTimeUtils; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.TimeZone; import java.util.stream.Collectors; import org.apache.commons.lang3.RandomStringUtils; -import org.apache.commons.lang3.StringUtils; import org.testcontainers.containers.PostgreSQLContainer; // todo (cgardens) - likely some of this could be further de-duplicated with @@ -124,7 +117,6 @@ private static Database getDatabaseFromConfig(final JsonNode config) { } private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws Exception { - TimeZone.setDefault(TimeZone.getTimeZone("UTC")); final JsonNode config = getConfig(); return SshTunnel.sshWrap( config, @@ -179,27 +171,4 @@ protected void tearDown(final TestDestinationEnv testEnv) throws Exception { bastion.stopAndCloseContainers(db); } - @Override - public boolean requiresDateTimeConversionForNormalizedSync() { - return true; - } - - @Override - public void convertDateTime(ObjectNode data, Map dateTimeFieldNames) { - if (dateTimeFieldNames.keySet().isEmpty()) { - return; - } - for (String path : dateTimeFieldNames.keySet()) { - if (isOneLevelPath(path) && !data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) { - var key = path.replace("/", StringUtils.EMPTY); - switch (dateTimeFieldNames.get(path)) { - case DATE_TIME -> data.put(key.toLowerCase(), - DateTimeUtils.convertToPostgresFormat(data.at(path).asText())); - case DATE -> data.put(key.toLowerCase(), - DateTimeUtils.convertToDateFormat(data.at(path).asText())); - } - } - } - } - } diff --git a/airbyte-integrations/connectors/destination-pubsub/src/test-integration/java/io/airbyte/integrations/destination/pubsub/PubsubDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-pubsub/src/test-integration/java/io/airbyte/integrations/destination/pubsub/PubsubDestinationAcceptanceTest.java index 8110ae94d98a6..a85de5db19452 100644 --- a/airbyte-integrations/connectors/destination-pubsub/src/test-integration/java/io/airbyte/integrations/destination/pubsub/PubsubDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-pubsub/src/test-integration/java/io/airbyte/integrations/destination/pubsub/PubsubDestinationAcceptanceTest.java @@ -157,7 +157,6 @@ NAMESPACE, nullToEmpty(n), e -> fromJsonNode(e).equals(new AirbyteStreamNameNamespacePair(nullToEmpty(streamName), nullToEmpty(namespace)))) .map(e -> e.get("data")) - .distinct() .collect(Collectors.toList()); } diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftCopyDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftCopyDestinationAcceptanceTest.java index 471cce663f1c0..92748deaa4087 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftCopyDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftCopyDestinationAcceptanceTest.java @@ -4,9 +4,6 @@ package io.airbyte.integrations.destination.redshift; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE_TIME; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.commons.io.IOs; @@ -16,15 +13,12 @@ import io.airbyte.db.Databases; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.base.JavaBaseConstants; -import io.airbyte.integrations.standardtest.destination.DateTimeUtils; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import java.nio.file.Path; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; /** * Integration test testing {@link RedshiftCopyS3Destination}. The default Redshift integration test @@ -160,27 +154,4 @@ protected int getMaxRecordValueLimit() { return RedshiftSqlOperations.REDSHIFT_VARCHAR_MAX_BYTE_SIZE; } - @Override - public boolean requiresDateTimeConversionForNormalizedSync() { - return true; - } - - @Override - public void convertDateTime(ObjectNode data, Map dateTimeFieldNames) { - if (dateTimeFieldNames.keySet().isEmpty()) { - return; - } - for (String path : dateTimeFieldNames.keySet()) { - if (isOneLevelPath(path) && !data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) { - var key = path.replace("/", StringUtils.EMPTY); - switch (dateTimeFieldNames.get(path)) { - case DATE_TIME -> data.put(key.toLowerCase(), - DateTimeUtils.convertToRedshiftFormat(data.at(path).asText())); - case DATE -> data.put(key.toLowerCase(), - DateTimeUtils.convertToDateFormat(data.at(path).asText())); - } - } - } - } - } diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3AvroDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3AvroDestinationAcceptanceTest.java index 8a26f38768125..9690a7492f03d 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3AvroDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3AvroDestinationAcceptanceTest.java @@ -4,26 +4,14 @@ package io.airbyte.integrations.destination.s3; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE_TIME; - import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectSummary; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; -import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.destination.s3.avro.AvroConstants; import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater; import io.airbyte.integrations.destination.s3.util.AvroRecordHelper; -import io.airbyte.integrations.standardtest.destination.DateTimeUtils; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteMessage.Type; -import io.airbyte.protocol.models.AirbyteRecordMessage; -import java.util.ArrayList; -import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -32,7 +20,6 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericDatumReader; -import org.apache.commons.lang3.StringUtils; public class S3AvroDestinationAcceptanceTest extends S3DestinationAcceptanceTest { @@ -80,69 +67,4 @@ protected List retrieveRecords(final TestDestinationEnv testEnv, return jsonRecords; } - @Override - public boolean requiresDateTimeConversionForSync() { - return true; - } - - @Override - public void convertDateTime(final ObjectNode data, final Map dateTimeFieldNames) { - for (final String path : dateTimeFieldNames.keySet()) { - if (!data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) { - final var pathFields = new ArrayList<>(Arrays.asList(path.split("/"))); - pathFields.remove(0); // first element always empty string - // if pathFields.size() == 1 -> /field else /field/nestedField.. - final var pathWithoutLastField = pathFields.size() == 1 ? "/" + pathFields.get(0) - : "/" + String.join("/", pathFields.subList(0, pathFields.size() - 1)); - switch (dateTimeFieldNames.get(path)) { - case DATE_TIME -> { - if (pathFields.size() == 1) { - data.put(pathFields.get(0).toLowerCase(), - (DateTimeUtils.getEpochMicros(data.get(pathFields.get(0)).asText()) / 1000) - * 1000); - } else { - ((ObjectNode) data.at(pathWithoutLastField)).put( - pathFields.get(pathFields.size() - 1), - (DateTimeUtils.getEpochMicros(data.at(path).asText()) / 1000) * 1000); - } - } - case DATE -> { - if (pathFields.size() == 1) { - data.put(pathFields.get(0).toLowerCase(), - DateTimeUtils.getEpochDay(data.get(pathFields.get(0)).asText())); - } else { - ((ObjectNode) data.at(pathWithoutLastField)).put( - pathFields.get(pathFields.size() - 1), - DateTimeUtils.getEpochDay((data.at(path).asText()))); - } - } - } - } - } - } - - @Override - protected void deserializeNestedObjects(final List messages, final List actualMessages) { - for (final AirbyteMessage message : messages) { - if (message.getType() == Type.RECORD) { - final var iterator = message.getRecord().getData().fieldNames(); - while (iterator.hasNext()) { - final var fieldName = iterator.next(); - if (message.getRecord().getData().get(fieldName).isContainerNode()) { - message.getRecord().getData().get(fieldName).fieldNames().forEachRemaining(f -> { - final var data = message.getRecord().getData().get(fieldName).get(f); - final var wrappedData = String.format("{\"%s\":%s,\"_airbyte_additional_properties\":null}", f, - dateTimeFieldNames.containsKey(f) || !data.isTextual() ? data.asText() : StringUtils.wrap(data.asText(), "\"")); - try { - ((ObjectNode) message.getRecord().getData()).set(fieldName, new ObjectMapper().readTree(wrappedData)); - } catch (final JsonProcessingException e) { - e.printStackTrace(); - } - }); - } - } - } - } - } - } diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3CsvDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3CsvDestinationAcceptanceTest.java index 737d5bf9b802d..a91d1f089e0a5 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3CsvDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3CsvDestinationAcceptanceTest.java @@ -11,9 +11,6 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig.Flattening; -import io.airbyte.protocol.models.AirbyteCatalog; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteRecordMessage; import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; @@ -111,15 +108,4 @@ protected List retrieveRecords(final TestDestinationEnv testEnv, return jsonRecords; } - @Override - protected void retrieveRawRecordsAndAssertSameMessages(final AirbyteCatalog catalog, - final List messages, - final String defaultSchema) - throws Exception { - final List actualMessages = retrieveRawRecords(catalog, defaultSchema); - deserializeNestedObjects(messages, actualMessages); - - assertSameMessages(messages, actualMessages, false); - } - } diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3ParquetDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3ParquetDestinationAcceptanceTest.java index 4ac4d7203ad99..7558ff9096dcf 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3ParquetDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3ParquetDestinationAcceptanceTest.java @@ -4,35 +4,22 @@ package io.airbyte.integrations.destination.s3; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE_TIME; - import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectSummary; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; -import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.destination.s3.avro.AvroConstants; import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater; import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter; import io.airbyte.integrations.destination.s3.util.AvroRecordHelper; -import io.airbyte.integrations.standardtest.destination.DateTimeUtils; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteMessage.Type; -import io.airbyte.protocol.models.AirbyteRecordMessage; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Map; import org.apache.avro.generic.GenericData; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.hadoop.ParquetReader; @@ -84,68 +71,4 @@ protected List retrieveRecords(final TestDestinationEnv testEnv, return jsonRecords; } - @Override - public boolean requiresDateTimeConversionForSync() { - return true; - } - - @Override - public void convertDateTime(final ObjectNode data, final Map dateTimeFieldNames) { - for (final String path : dateTimeFieldNames.keySet()) { - if (!data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) { - final var pathFields = new ArrayList<>(Arrays.asList(path.split("/"))); - pathFields.remove(0); // first element always empty string - // if pathFields.size() == 1 -> /field else /field/nestedField.. - final var pathWithoutLastField = pathFields.size() == 1 ? "/" + pathFields.get(0) - : "/" + String.join("/", pathFields.subList(0, pathFields.size() - 1)); - switch (dateTimeFieldNames.get(path)) { - case DATE_TIME -> { - if (pathFields.size() == 1) { - data.put(pathFields.get(0).toLowerCase(), - (DateTimeUtils.getEpochMicros(data.get(pathFields.get(0)).asText()) / 1000) - * 1000); - } else { - ((ObjectNode) data.at(pathWithoutLastField)).put( - pathFields.get(pathFields.size() - 1), - (DateTimeUtils.getEpochMicros(data.at(path).asText()) / 1000) * 1000); - } - } - case DATE -> { - if (pathFields.size() == 1) { - data.put(pathFields.get(0).toLowerCase(), - DateTimeUtils.getEpochDay(data.get(pathFields.get(0)).asText())); - } else { - ((ObjectNode) data.at(pathWithoutLastField)).put(pathFields.get(pathFields.size() - 1), - DateTimeUtils.getEpochDay((data.at(path).asText()))); - } - } - } - } - } - } - - @Override - protected void deserializeNestedObjects(final List messages, final List actualMessages) { - for (final AirbyteMessage message : messages) { - if (message.getType() == Type.RECORD) { - final var iterator = message.getRecord().getData().fieldNames(); - while (iterator.hasNext()) { - final var fieldName = iterator.next(); - if (message.getRecord().getData().get(fieldName).isContainerNode()) { - message.getRecord().getData().get(fieldName).fieldNames().forEachRemaining(f -> { - final var data = message.getRecord().getData().get(fieldName).get(f); - final var wrappedData = String.format("{\"%s\":%s,\"_airbyte_additional_properties\":null}", f, - dateTimeFieldNames.containsKey(f) || !data.isTextual() ? data.asText() : StringUtils.wrap(data.asText(), "\"")); - try { - ((ObjectNode) message.getRecord().getData()).set(fieldName, new ObjectMapper().readTree(wrappedData)); - } catch (final JsonProcessingException e) { - e.printStackTrace(); - } - }); - } - } - } - } - } - } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java index 1f4610e0766c8..28cb59fc4d446 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java @@ -4,14 +4,10 @@ package io.airbyte.integrations.destination.snowflake; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE; -import static io.airbyte.integrations.standardtest.destination.DateTimeUtils.DATE_TIME; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; import io.airbyte.commons.io.IOs; @@ -24,12 +20,9 @@ import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.standardtest.destination.DataArgumentsProvider; -import io.airbyte.integrations.standardtest.destination.DateTimeUtils; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteMessage.Type; -import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.nio.file.Path; @@ -37,13 +30,9 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.TimeZone; import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -173,7 +162,6 @@ protected void setup(final TestDestinationEnv testEnv) throws Exception { database = SnowflakeDatabase.getDatabase(config); database.execute(createSchemaQuery); - TimeZone.setDefault(TimeZone.getTimeZone("UTC")); } @Override @@ -213,55 +201,4 @@ public void testSyncWithBillionRecords(final String messagesFilename, final Stri runSyncAndVerifyStateOutput(config, largeNumberRecords, configuredCatalog, false); } - @Override - public boolean requiresDateTimeConversionForNormalizedSync() { - return true; - } - - @Override - public void convertDateTime(ObjectNode data, Map dateTimeFieldNames) { - if (dateTimeFieldNames.keySet().isEmpty()) { - return; - } - for (String path : dateTimeFieldNames.keySet()) { - if (isOneLevelPath(path) && !data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) { - var key = path.replace("/", StringUtils.EMPTY); - switch (dateTimeFieldNames.get(path)) { - case DATE_TIME -> data.put(key.toLowerCase(), - DateTimeUtils.convertToSnowflakeFormat(data.at(path).asText())); - case DATE -> data.put(key.toLowerCase(), - DateTimeUtils.convertToDateFormatWithZeroTime(data.at(path).asText())); - } - } - } - } - - protected void deserializeNestedObjects(List messages, List actualMessages) { - HashSet nestedFieldNames = new HashSet<>(); - for (AirbyteMessage message : messages) { - if (message.getType() == Type.RECORD) { - var iterator = message.getRecord().getData().fieldNames(); - if (iterator.hasNext()) { - var fieldName = iterator.next(); - if (message.getRecord().getData().get(fieldName).isContainerNode()) { - nestedFieldNames.add(fieldName.toUpperCase()); - } - } - } - } - if (actualMessages != null) { - for (AirbyteRecordMessage message : actualMessages) { - nestedFieldNames.stream().filter(name -> message.getData().has(name)).forEach(name -> { - var data = message.getData().get(name).asText(); - try { - ((ObjectNode) message.getData()).put(name, - new ObjectMapper().readTree(data)); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } - }); - } - } - } - }