diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/MySQLDateTimeConverter.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/MySQLDateTimeConverter.java index e99687eb39e5f..008167aff3367 100644 --- a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/MySQLDateTimeConverter.java +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/MySQLDateTimeConverter.java @@ -5,11 +5,11 @@ package io.airbyte.integrations.debezium.internals; import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.db.DataTypeUtils; +import io.airbyte.db.jdbc.DateTimeConverter; import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; -import java.time.LocalDate; import java.util.Arrays; +import java.util.Locale; import java.util.Properties; import org.apache.kafka.connect.data.SchemaBuilder; import org.slf4j.Logger; @@ -30,7 +30,7 @@ public class MySQLDateTimeConverter implements CustomConverter registration) { - registration.register(SchemaBuilder.string(), - x -> x == null ? convertDefaultValueNullDate(field) : DebeziumConverterUtils.convertDate(x)); + final var fieldType = field.typeName(); + + registration.register(SchemaBuilder.string().optional(), x -> { + if (x == null) { + return DebeziumConverterUtils.convertDefaultValue(field); + } + + return switch (fieldType.toUpperCase(Locale.ROOT)) { + case "DATETIME" -> DateTimeConverter.convertToTimestamp(x); + case "DATE" -> DateTimeConverter.convertToDate(x); + case "TIME" -> DateTimeConverter.convertToTime(x); + case "TIMESTAMP" -> DateTimeConverter.convertToTimestampWithTimezone(x); + default -> throw new IllegalArgumentException("Unknown field type " + fieldType.toUpperCase(Locale.ROOT)); + }; + }); } } diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index 1cd439f3f7984..08797b63673fe 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -24,6 +24,7 @@ dependencies { testImplementation testFixtures(project(':airbyte-integrations:bases:debezium-v1-9-2')) testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc')) testImplementation 'org.apache.commons:commons-lang3:3.11' + testImplementation 'org.hamcrest:hamcrest-all:1.3' testImplementation libs.connectors.testcontainers.mysql integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test') diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java index 4dcccac510f8b..cf30b04329a38 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java @@ -9,7 +9,7 @@ public class MySqlCdcProperties { - static Properties getDebeziumProperties(JsonNode config) { + static Properties getDebeziumProperties(final JsonNode config) { final Properties props = new Properties(); // debezium engine configuration @@ -26,8 +26,13 @@ static Properties getDebeziumProperties(JsonNode config) { props.setProperty("datetime.type", "io.airbyte.integrations.debezium.internals.MySQLDateTimeConverter"); // snapshot config - // https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-mode - props.setProperty("snapshot.mode", "when_needed"); + if (config.has("snapshot_mode")) { + //The parameter `snapshot_mode` is passed in test to simulate reading the binlog directly and skip initial snapshot + props.setProperty("snapshot.mode", config.get("snapshot_mode").asText()); + } else { + // https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-mode + props.setProperty("snapshot.mode", "when_needed"); + } // https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-locking-mode // This is to make sure other database clients are allowed to write to a table while Airbyte is // taking a snapshot. There is a risk involved that diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java index 91de0cf633384..d42d0d8408810 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java @@ -20,10 +20,17 @@ import io.airbyte.db.DataTypeUtils; import io.airbyte.db.SourceOperations; import io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations; +import io.airbyte.db.jdbc.DateTimeConverter; import io.airbyte.protocol.models.JsonSchemaType; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.OffsetTime; +import java.time.format.DateTimeParseException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,7 +80,8 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob case DOUBLE, DOUBLE_UNSIGNED -> putDouble(json, columnName, resultSet, colIndex); case DECIMAL, DECIMAL_UNSIGNED -> putBigDecimal(json, columnName, resultSet, colIndex); case DATE -> putDate(json, columnName, resultSet, colIndex); - case DATETIME, TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex); + case DATETIME -> putTimestamp(json, columnName, resultSet, colIndex); + case TIMESTAMP -> putTimestampWithTimezone(json, columnName, resultSet, colIndex); case TIME -> putTime(json, columnName, resultSet, colIndex); // The returned year value can either be a java.sql.Short (when yearIsDateType=false) // or a java.sql.Date with the date set to January 1st, at midnight (when yearIsDateType=true). @@ -125,7 +133,8 @@ public void setStatementField(final PreparedStatement preparedStatement, case FLOAT, FLOAT_UNSIGNED, DOUBLE, DOUBLE_UNSIGNED -> setDouble(preparedStatement, parameterIndex, value); case DECIMAL, DECIMAL_UNSIGNED -> setDecimal(preparedStatement, parameterIndex, value); case DATE -> setDate(preparedStatement, parameterIndex, value); - case DATETIME, TIMESTAMP -> setTimestamp(preparedStatement, parameterIndex, value); + case DATETIME -> setTimestamp(preparedStatement, parameterIndex, value); + case TIMESTAMP -> setTimestampWithTimezone(preparedStatement, parameterIndex, value); case TIME -> setTime(preparedStatement, parameterIndex, value); case YEAR, CHAR, VARCHAR, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT, ENUM, SET -> setString(preparedStatement, parameterIndex, value); case TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY -> setBinary(preparedStatement, parameterIndex, value); @@ -168,7 +177,22 @@ public MysqlType getFieldType(final JsonNode field) { } @Override - public JsonSchemaType getJsonType(MysqlType mysqlType) { + protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { + node.put(columnName, DateTimeConverter.convertToDate(getObject(resultSet, index, LocalDate.class))); + } + + @Override + protected void putTime(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { + node.put(columnName, DateTimeConverter.convertToTime(getObject(resultSet, index, LocalTime.class))); + } + + @Override + protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { + node.put(columnName, DateTimeConverter.convertToTimestamp(getObject(resultSet, index, LocalDateTime.class))); + } + + @Override + public JsonSchemaType getJsonType(final MysqlType mysqlType) { return switch (mysqlType) { case // TINYINT(1) is boolean, but it should have been converted to MysqlType.BOOLEAN in {@link @@ -179,8 +203,54 @@ public JsonSchemaType getJsonType(MysqlType mysqlType) { case NULL -> JsonSchemaType.NULL; // BIT(1) is boolean, but it should have been converted to MysqlType.BOOLEAN in {@link getFieldType} case BIT, TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY, GEOMETRY -> JsonSchemaType.STRING_BASE_64; + case TIME -> JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE; + case DATETIME -> JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE; + case TIMESTAMP -> JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE; + case DATE -> JsonSchemaType.STRING_DATE; default -> JsonSchemaType.STRING; }; } + @Override + protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { + try { + preparedStatement.setObject(parameterIndex, LocalDate.parse(value)); + } catch (final DateTimeParseException e) { + // This is just for backward compatibility for connectors created on versions before PR https://github.com/airbytehq/airbyte/pull/15504 + LOGGER.warn("Exception occurred while trying to parse value for date column the new way, trying the old way", e); + super.setDate(preparedStatement, parameterIndex, value); + } + } + + @Override + protected void setTimestamp(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException { + try { + preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value)); + } catch (final DateTimeParseException e) { + // This is just for backward compatibility for connectors created on versions before PR https://github.com/airbytehq/airbyte/pull/15504 + LOGGER.warn("Exception occurred while trying to parse value for datetime column the new way, trying the old way", e); + preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value)); + } + } + + private void setTimestampWithTimezone(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { + try { + preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value)); + } catch (final DateTimeParseException e) { + // This is just for backward compatibility for connectors created on versions before PR https://github.com/airbytehq/airbyte/pull/15504 + LOGGER.warn("Exception occurred while trying to parse value for timestamp column the new way, trying the old way", e); + preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value)); + } + } + + @Override + protected void setTime(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { + try { + preparedStatement.setObject(parameterIndex, LocalTime.parse(value)); + } catch (final DateTimeParseException e) { + LOGGER.warn("Exception occurred while trying to parse value for time column the new way, trying the old way", e); + // This is just for backward compatibility for connectors created on versions before PR https://github.com/airbytehq/airbyte/pull/15504 + super.setTime(preparedStatement, parameterIndex, value); + } + } } diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractMySqlSourceDatatypeTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractMySqlSourceDatatypeTest.java index c9972beed5030..d50e1b1df010c 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractMySqlSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractMySqlSourceDatatypeTest.java @@ -235,34 +235,28 @@ protected void initTests() { addDataTypeTestData( TestDataHolder.builder() .sourceType("date") - .airbyteType(JsonSchemaType.STRING) + .airbyteType(JsonSchemaType.STRING_DATE) .addInsertValues("null", "'2021-01-01'") - .addExpectedValues(null, "2021-01-01T00:00:00Z") + .addExpectedValues(null, "2021-01-01") .build()); addDataTypeTestData( TestDataHolder.builder() .sourceType("datetime") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'2005-10-10 23:22:21'") - .addExpectedValues(null, "2005-10-10T23:22:21.000000Z") + .airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE) + .addInsertValues("null", "'2005-10-10 23:22:21'", "'2013-09-05T10:10:02'", "'2013-09-06T10:10:02'") + .addExpectedValues(null, "2005-10-10T23:22:21.000000", "2013-09-05T10:10:02.000000", "2013-09-06T10:10:02.000000") .build()); - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("timestamp") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'2021-01-00'", "'2021-00-00'", "'0000-00-00'") - .addExpectedValues(null, null, null, null) - .build()); + addTimestampDataTypeTest(); addDataTypeTestData( TestDataHolder.builder() .sourceType("time") - .airbyteType(JsonSchemaType.STRING) + .airbyteType(JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE) // JDBC driver can process only "clock"(00:00:00-23:59:59) values. - .addInsertValues("null", "'-23:59:59'", "'00:00:00'") - .addExpectedValues(null, "1970-01-01T23:59:59Z", "1970-01-01T00:00:00Z") + .addInsertValues("null", "'-22:59:59'", "'23:59:59'", "'00:00:00'") + .addExpectedValues(null, "22:59:59.000000", "23:59:59.000000", "00:00:00.000000") .build()); addDataTypeTestData( @@ -384,13 +378,7 @@ protected void initTests() { .addExpectedValues(StringUtils.leftPad("0", 1048000, "0"), "test") .build()); - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("json") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("null", "'{\"a\": 10, \"b\": 15}'", "'{\"fóo\": \"bär\"}'", "'{\"春江潮水连海平\": \"海上明月共潮生\"}'") - .addExpectedValues(null, "{\"a\": 10, \"b\": 15}", "{\"fóo\": \"bär\"}", "{\"春江潮水连海平\": \"海上明月共潮生\"}") - .build()); + addJsonDataTypeTest(); addDataTypeTestData( TestDataHolder.builder() @@ -412,6 +400,26 @@ protected void initTests() { } + protected void addJsonDataTypeTest() { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("json") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("null", "'{\"a\": 10, \"b\": 15}'", "'{\"fóo\": \"bär\"}'", "'{\"春江潮水连海平\": \"海上明月共潮生\"}'") + .addExpectedValues(null, "{\"a\": 10, \"b\": 15}", "{\"fóo\": \"bär\"}", "{\"春江潮水连海平\": \"海上明月共潮生\"}") + .build()); + } + + protected void addTimestampDataTypeTest() { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("timestamp") + .airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE) + .addInsertValues("null", "'2021-01-00'", "'2021-00-00'", "'0000-00-00'", "'2022-08-09T10:17:16.161342Z'") + .addExpectedValues(null, null, null, null, "2022-08-09T10:17:16.000000Z") + .build()); + } + private String getLogString(final int length) { final int maxLpadLength = 262144; final StringBuilder stringBuilder = new StringBuilder("concat("); diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcBinlogsMySqlSourceDatatypeTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcBinlogsMySqlSourceDatatypeTest.java new file mode 100644 index 0000000000000..d93035155c33c --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcBinlogsMySqlSourceDatatypeTest.java @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.mysql; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.Database; +import io.airbyte.db.factory.DSLContextFactory; +import io.airbyte.db.factory.DatabaseDriver; +import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.integrations.standardtest.source.TestDataHolder; +import io.airbyte.integrations.standardtest.source.TestDestinationEnv; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.JsonSchemaType; +import java.util.List; +import org.jooq.DSLContext; +import org.jooq.SQLDialect; +import org.testcontainers.containers.MySQLContainer; + +public class CdcBinlogsMySqlSourceDatatypeTest extends AbstractMySqlSourceDatatypeTest { + + private DSLContext dslContext; + private JsonNode stateAfterFirstSync; + + @Override + protected void tearDown(final TestDestinationEnv testEnv) { + dslContext.close(); + container.close(); + } + + @Override + protected List runRead(ConfiguredAirbyteCatalog configuredCatalog) throws Exception { + if (stateAfterFirstSync == null) { + throw new RuntimeException("stateAfterFirstSync is null"); + } + return super.runRead(configuredCatalog, stateAfterFirstSync); + } + + @Override + protected void setupEnvironment(TestDestinationEnv environment) throws Exception { + final Database database = setupDatabase(); + initTests(); + for (final TestDataHolder test : testDataHolders) { + database.query(ctx -> { + ctx.fetch(test.getCreateSqlQuery()); + return null; + }); + } + + final ConfiguredAirbyteStream dummyTableWithData = createDummyTableWithData(database); + final ConfiguredAirbyteCatalog catalog = getConfiguredCatalog(); + catalog.getStreams().add(dummyTableWithData); + + final List allMessages = super.runRead(catalog); + if (allMessages.size() != 2) { + throw new RuntimeException("First sync should only generate 2 records"); + } + final List stateAfterFirstBatch = extractStateMessages(allMessages); + if (stateAfterFirstBatch == null || stateAfterFirstBatch.isEmpty()) { + throw new RuntimeException("stateAfterFirstBatch should not be null or empty"); + } + stateAfterFirstSync = Jsons.jsonNode(stateAfterFirstBatch); + if (stateAfterFirstSync == null) { + throw new RuntimeException("stateAfterFirstSync should not be null"); + } + for (final TestDataHolder test : testDataHolders) { + database.query(ctx -> { + test.getInsertSqlQueries().forEach(ctx::fetch); + return null; + }); + } + } + + @Override + protected Database setupDatabase() throws Exception { + container = new MySQLContainer<>("mysql:8.0"); + container.start(); + + config = Jsons.jsonNode(ImmutableMap.builder() + .put(JdbcUtils.HOST_KEY, container.getHost()) + .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) + .put(JdbcUtils.DATABASE_KEY, container.getDatabaseName()) + .put(JdbcUtils.USERNAME_KEY, container.getUsername()) + .put(JdbcUtils.PASSWORD_KEY, container.getPassword()) + .put("replication_method", MySqlSource.ReplicationMethod.CDC) + .build()); + + dslContext = DSLContextFactory.create( + config.get(JdbcUtils.USERNAME_KEY).asText(), + config.get(JdbcUtils.PASSWORD_KEY).asText(), + DatabaseDriver.MYSQL.getDriverClassName(), + String.format(DatabaseDriver.MYSQL.getUrlFormatString(), + config.get(JdbcUtils.HOST_KEY).asText(), + config.get(JdbcUtils.PORT_KEY).asInt(), + config.get(JdbcUtils.DATABASE_KEY).asText()), + SQLDialect.MYSQL); + final Database database = new Database(dslContext); + + // It disable strict mode in the DB and allows to insert specific values. + // For example, it's possible to insert date with zero values "2021-00-00" + database.query(ctx -> ctx.fetch("SET @@sql_mode=''")); + + revokeAllPermissions(); + grantCorrectPermissions(); + + return database; + } + + private void revokeAllPermissions() { + executeQuery("REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + container.getUsername() + "@'%';"); + } + + private void grantCorrectPermissions() { + executeQuery( + "GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO " + + container.getUsername() + "@'%';"); + } + + private void executeQuery(final String query) { + try (final DSLContext dslContext = DSLContextFactory.create( + "root", + "test", + DatabaseDriver.MYSQL.getDriverClassName(), + String.format(DatabaseDriver.MYSQL.getUrlFormatString(), + container.getHost(), + container.getFirstMappedPort(), + container.getDatabaseName()), + SQLDialect.MYSQL)) { + final Database database = new Database(dslContext); + database.query( + ctx -> ctx + .execute(query)); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean testCatalog() { + return true; + } + + @Override + protected void addTimestampDataTypeTest() { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("timestamp") + .airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE) + .addInsertValues("null", "'2021-01-00'", "'2021-00-00'", "'0000-00-00'", "'2022-08-09T10:17:16.161342Z'") + .addExpectedValues(null, "1970-01-01T00:00:00.000000Z", "1970-01-01T00:00:00.000000Z", "1970-01-01T00:00:00.000000Z", + "2022-08-09T10:17:16.000000Z") + .build()); + } + + @Override + protected void addJsonDataTypeTest() { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("json") + .airbyteType(JsonSchemaType.STRING) + .addInsertValues("null", "'{\"a\":10,\"b\":15}'", "'{\"fóo\":\"bär\"}'", "'{\"春江潮水连海平\":\"海上明月共潮生\"}'") + .addExpectedValues(null, "{\"a\":10,\"b\":15}", "{\"fóo\":\"bär\"}", "{\"春江潮水连海平\":\"海上明月共潮生\"}") + .build()); + } +} diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceDatatypeTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcInitialSnapshotMySqlSourceDatatypeTest.java similarity index 93% rename from airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceDatatypeTest.java rename to airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcInitialSnapshotMySqlSourceDatatypeTest.java index 11a644fffb113..566d5b6cecabe 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcInitialSnapshotMySqlSourceDatatypeTest.java @@ -15,7 +15,7 @@ import org.jooq.SQLDialect; import org.testcontainers.containers.MySQLContainer; -public class CdcMySqlSourceDatatypeTest extends AbstractMySqlSourceDatatypeTest { +public class CdcInitialSnapshotMySqlSourceDatatypeTest extends AbstractMySqlSourceDatatypeTest { private DSLContext dslContext; @@ -37,6 +37,7 @@ protected Database setupDatabase() throws Exception { .put(JdbcUtils.USERNAME_KEY, container.getUsername()) .put(JdbcUtils.PASSWORD_KEY, container.getPassword()) .put("replication_method", MySqlSource.ReplicationMethod.CDC) + .put("snapshot_mode", "initial_only") .build()); dslContext = DSLContextFactory.create( @@ -89,4 +90,9 @@ private void executeQuery(final String query) { } } + @Override + public boolean testCatalog() { + return true; + } + } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java index 9a0ffef353982..29f199870958b 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java @@ -117,83 +117,6 @@ public void tearDown() { } } - @Test - public void fullRefreshAndCDCShouldReturnSameRecords() throws Exception { - JsonNode record1 = Jsons.jsonNode(ImmutableMap.of( - "id", 1, - "bool_col", true, - "tiny_int_one_col", true)); - ((ObjectNode) record1).put("tiny_int_two_col", (short) 80); - JsonNode record2 = Jsons.jsonNode(ImmutableMap.of( - "id", 2, - "bool_col", false, - "tiny_int_one_col", false)); - ((ObjectNode) record2).put("tiny_int_two_col", (short) 90); - ImmutableList records = ImmutableList.of(record1, record2); - Set originalData = new HashSet<>(records); - setupForComparisonBetweenFullRefreshAndCDCSnapshot(records); - - AirbyteCatalog discover = source.discover(config); - List streams = discover.getStreams(); - - assertEquals(streams.size(), 1); - JsonNode jsonSchema = streams.get(0).getJsonSchema().get("properties"); - assertEquals(jsonSchema.get("id").get("type").asText(), "number"); - assertEquals(jsonSchema.get("bool_col").get("type").asText(), "boolean"); - assertEquals(jsonSchema.get("tiny_int_one_col").get("type").asText(), "boolean"); - assertEquals(jsonSchema.get("tiny_int_two_col").get("type").asText(), "number"); - - AirbyteCatalog catalog = new AirbyteCatalog().withStreams(streams); - final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers - .toDefaultConfiguredCatalog(catalog); - configuredCatalog.getStreams().forEach(c -> c.setSyncMode(SyncMode.FULL_REFRESH)); - - Set dataFromFullRefresh = extractRecordMessages( - AutoCloseableIterators.toListAndClose(source.read(config, configuredCatalog, null))) - .stream() - .map(AirbyteRecordMessage::getData).collect(Collectors.toSet()); - - configuredCatalog.getStreams().forEach(c -> c.setSyncMode(SyncMode.INCREMENTAL)); - Set dataFromDebeziumSnapshot = - extractRecordMessages(AutoCloseableIterators.toListAndClose(source.read(config, configuredCatalog, null))) - .stream() - .map(airbyteRecordMessage -> { - JsonNode data = airbyteRecordMessage.getData(); - removeCDCColumns((ObjectNode) data); - /** - * Debezium reads TINYINT (expect for TINYINT(1)) as IntNode while FullRefresh reads it as Short Ref - * : {@link io.airbyte.db.jdbc.JdbcUtils#setJsonField(java.sql.ResultSet, int, ObjectNode)} -> case - * TINYINT, SMALLINT -> o.put(columnName, r.getShort(i)); - */ - ((ObjectNode) data) - .put("tiny_int_two_col", (short) data.get("tiny_int_two_col").asInt()); - return data; - }) - .collect(Collectors.toSet()); - - assertEquals(dataFromFullRefresh, originalData); - assertEquals(dataFromFullRefresh, dataFromDebeziumSnapshot); - } - - private void setupForComparisonBetweenFullRefreshAndCDCSnapshot(ImmutableList data) { - executeQuery("CREATE DATABASE " + "test_schema" + ";"); - executeQuery(String.format( - "CREATE TABLE %s.%s(%s INTEGER, %s Boolean, %s TINYINT(1), %s TINYINT(2), PRIMARY KEY (%s));", - "test_schema", "table_with_tiny_int", "id", "bool_col", "tiny_int_one_col", - "tiny_int_two_col", "id")); - - for (JsonNode record : data) { - executeQuery(String - .format("INSERT INTO %s.%s (%s, %s, %s, %s) VALUES (%s, %s, %s, %s);", "test_schema", - "table_with_tiny_int", - "id", "bool_col", "tiny_int_one_col", "tiny_int_two_col", - record.get("id").asInt(), record.get("bool_col").asBoolean(), - record.get("tiny_int_one_col").asBoolean() ? 99 : -99, record.get("tiny_int_two_col").asInt())); - } - - ((ObjectNode) config).put("database", "test_schema"); - } - @Override protected CdcTargetPosition cdcLatestTargetPosition() { DataSource dataSource = DataSourceFactory.create( @@ -306,78 +229,6 @@ protected AirbyteCatalog expectedCatalogForDiscover() { return expectedCatalog; } - // TODO : Enable this test once we fix handling of DATETIME values - @Test - @Disabled - public void dateTimeDataTypeTest() throws Exception { - JsonNode record1 = Jsons.jsonNode(ImmutableMap.of( - "id", 1, - "datetime_col", "\'2013-09-05T10:10:02\'")); - JsonNode record2 = Jsons.jsonNode(ImmutableMap.of( - "id", 2, - "datetime_col", "\'2013-09-06T10:10:02\'")); - ImmutableList records = ImmutableList.of(record1, record2); - setupForDateTimeDataTypeTest(records); - Set originalData = records.stream().peek(c -> { - String dateTimeValue = c.get("datetime_col").asText(); - ((ObjectNode) c).put("datetime_col", dateTimeValue.substring(1, dateTimeValue.length() - 1)); - }).collect(Collectors.toSet()); - - AirbyteCatalog discover = source.discover(config); - List streams = discover.getStreams(); - - assertEquals(streams.size(), 1); - JsonNode jsonSchema = streams.get(0).getJsonSchema().get("properties"); - assertEquals(jsonSchema.get("id").get("type").asText(), "number"); - assertEquals(jsonSchema.get("datetime_col").get("type").asText(), "string"); - - AirbyteCatalog catalog = new AirbyteCatalog().withStreams(streams); - final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog); - - configuredCatalog.getStreams().forEach(c -> c.setSyncMode(SyncMode.INCREMENTAL)); - Set dataFromDebeziumSnapshot = - extractRecordMessages(AutoCloseableIterators.toListAndClose(source.read(config, configuredCatalog, null))) - .stream() - .map(airbyteRecordMessage -> { - JsonNode data = airbyteRecordMessage.getData(); - removeCDCColumns((ObjectNode) data); - return data; - }) - .collect(Collectors.toSet()); - - assertEquals(originalData, dataFromDebeziumSnapshot); - - // TODO: Fix full refresh (non-cdc) mode. The value of the datetime_col is adjusted by the TIMEZONE - // the code is running in, - // in my case it got adjusted to IST i.e. "2013-09-05T15:40:02Z" and "2013-09-06T15:40:02Z". - // configuredCatalog.getStreams().forEach(c -> c.setSyncMode(SyncMode.FULL_REFRESH)); - // Set dataFromFullRefresh = extractRecordMessages( - // AutoCloseableIterators.toListAndClose(source.read(config, configuredCatalog, null))) - // .stream() - // .map(AirbyteRecordMessage::getData).collect(Collectors.toSet()); - // assertEquals(dataFromFullRefresh, originalData); - } - - private void setupForDateTimeDataTypeTest(ImmutableList data) { - executeQuery("CREATE DATABASE " + "test_schema" + ";"); - executeQuery(String.format( - "CREATE TABLE %s.%s(%s INTEGER, %s DATETIME, PRIMARY KEY (%s));", - "test_schema", "table_with_date_time", "id", "datetime_col", "id")); - - executeQuery(String - .format("INSERT INTO %s.%s (%s, %s) VALUES (%s, %s);", "test_schema", - "table_with_date_time", - "id", "datetime_col", - data.get(0).get("id").asInt(), data.get(0).get("datetime_col").asText())); - - executeQuery(String - .format("INSERT INTO %s.%s (%s, %s) VALUES (%s, %s);", "test_schema", - "table_with_date_time", - "id", "datetime_col", - data.get(1).get("id").asInt(), data.get(1).get("datetime_col").asText())); - ((ObjectNode) config).put("database", "test_schema"); - } - @Test protected void syncShouldHandlePurgedLogsGracefully() throws Exception { diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java index ab429f263ff09..a15791cb19e5f 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java @@ -18,9 +18,22 @@ import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; +import io.airbyte.integrations.source.relationaldb.models.DbStreamState; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaType; +import io.airbyte.protocol.models.SyncMode; import java.sql.Connection; import java.sql.DriverManager; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import org.jooq.DSLContext; import org.jooq.SQLDialect; @@ -120,4 +133,90 @@ void testSpec() throws Exception { assertEquals(expected, actual); } + @Override + protected AirbyteCatalog getCatalog(final String defaultNamespace) { + return new AirbyteCatalog().withStreams(List.of( + CatalogHelpers.createAirbyteStream( + TABLE_NAME, + defaultNamespace, + Field.of(COL_ID, JsonSchemaType.INTEGER), + Field.of(COL_NAME, JsonSchemaType.STRING), + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) + .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))), + CatalogHelpers.createAirbyteStream( + TABLE_NAME_WITHOUT_PK, + defaultNamespace, + Field.of(COL_ID, JsonSchemaType.INTEGER), + Field.of(COL_NAME, JsonSchemaType.STRING), + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) + .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey(Collections.emptyList()), + CatalogHelpers.createAirbyteStream( + TABLE_NAME_COMPOSITE_PK, + defaultNamespace, + Field.of(COL_FIRST_NAME, JsonSchemaType.STRING), + Field.of(COL_LAST_NAME, JsonSchemaType.STRING), + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) + .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey( + List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME))))); + } + + @Override + protected void incrementalDateCheck() throws Exception { + incrementalCursorCheck( + COL_UPDATED_AT, + "2005-10-18", + "2006-10-19", + List.of(getTestMessages().get(1), getTestMessages().get(2))); + } + + @Override + protected List getTestMessages() { + return List.of( + new AirbyteMessage().withType(Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) + .withData(Jsons.jsonNode(Map + .of(COL_ID, ID_VALUE_1, + COL_NAME, "picard", + COL_UPDATED_AT, "2004-10-19")))), + new AirbyteMessage().withType(Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) + .withData(Jsons.jsonNode(Map + .of(COL_ID, ID_VALUE_2, + COL_NAME, "crusher", + COL_UPDATED_AT, + "2005-10-19")))), + new AirbyteMessage().withType(Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) + .withData(Jsons.jsonNode(Map + .of(COL_ID, ID_VALUE_3, + COL_NAME, "vash", + COL_UPDATED_AT, "2006-10-19"))))); + } + + @Override + protected List getExpectedAirbyteMessagesSecondSync(String namespace) { + final List expectedMessages = new ArrayList<>(); + expectedMessages.add(new AirbyteMessage().withType(Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace) + .withData(Jsons.jsonNode(Map + .of(COL_ID, ID_VALUE_4, + COL_NAME, "riker", + COL_UPDATED_AT, "2006-10-19"))))); + expectedMessages.add(new AirbyteMessage().withType(Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace) + .withData(Jsons.jsonNode(Map + .of(COL_ID, ID_VALUE_5, + COL_NAME, "data", + COL_UPDATED_AT, "2006-10-19"))))); + final DbStreamState state = new DbStreamState() + .withStreamName(streamName) + .withStreamNamespace(namespace) + .withCursorField(List.of(COL_ID)) + .withCursor("5"); + expectedMessages.addAll(createExpectedTestMessages(List.of(state))); + return expectedMessages; + } } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceOperationsTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceOperationsTest.java new file mode 100644 index 0000000000000..9594647287ddf --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceOperationsTest.java @@ -0,0 +1,302 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.mysql; + +import static io.airbyte.integrations.source.mysql.MySqlSource.DRIVER_CLASS; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.mysql.cj.MysqlType; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.Database; +import io.airbyte.db.factory.DSLContextFactory; +import io.airbyte.db.jdbc.DateTimeConverter; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.jooq.SQLDialect; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.MySQLContainer; + +public class MySqlSourceOperationsTest { + + private final MySqlSourceOperations sqlSourceOperations = new MySqlSourceOperations(); + private MySQLContainer container; + private Database database; + + @BeforeEach + private void init() { + container = new MySQLContainer<>("mysql:8.0"); + container.start(); + database = new Database(DSLContextFactory.create( + "root", + "test", + DRIVER_CLASS, + String.format("jdbc:mysql://%s:%s", + container.getHost(), + container.getFirstMappedPort()), + SQLDialect.MYSQL)); + } + + @AfterEach + public void tearDown() { + try { + container.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + public void dateColumnAsCursor() throws SQLException { + final String tableName = container.getDatabaseName() + ".table_with_date"; + final String cursorColumn = "cursor_column"; + executeQuery("CREATE TABLE " + tableName + "(id INTEGER PRIMARY KEY, " + cursorColumn + " DATE);"); + + final List expectedRecords = new ArrayList<>(); + for (int i = 1; i <= 4; i++) { + final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); + jsonNode.put("id", i); + final LocalDate cursorValue = LocalDate.of(2019, 1, i); + jsonNode.put("cursor_column", DateTimeConverter.convertToDate(cursorValue)); + executeQuery("INSERT INTO " + tableName + " VALUES (" + i + ", '" + cursorValue + "');"); + if (i >= 2) { + expectedRecords.add(jsonNode); + } + } + + final List actualRecords = new ArrayList<>(); + try (final Connection connection = container.createConnection("")) { + final PreparedStatement preparedStatement = connection.prepareStatement( + "SELECT * from " + tableName + " WHERE " + cursorColumn + " > ?"); + sqlSourceOperations.setStatementField(preparedStatement, 1, MysqlType.DATE, DateTimeConverter.convertToDate(LocalDate.of(2019, 1, 1))); + + try (final ResultSet resultSet = preparedStatement.executeQuery()) { + while (resultSet.next()) { + final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); + for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { + sqlSourceOperations.setJsonField(resultSet, i, jsonNode); + } + actualRecords.add(jsonNode); + } + } + } + assertThat(actualRecords, containsInAnyOrder(expectedRecords.toArray())); + + // Test to check backward compatibility for connectors created before PR https://github.com/airbytehq/airbyte/pull/15504 + actualRecords.clear(); + try (final Connection connection = container.createConnection("")) { + final PreparedStatement preparedStatement = connection.prepareStatement( + "SELECT * from " + tableName + " WHERE " + cursorColumn + " > ?"); + sqlSourceOperations.setStatementField(preparedStatement, 1, MysqlType.DATE, "2019-01-01T00:00:00Z"); + + try (final ResultSet resultSet = preparedStatement.executeQuery()) { + while (resultSet.next()) { + final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); + for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { + sqlSourceOperations.setJsonField(resultSet, i, jsonNode); + } + actualRecords.add(jsonNode); + } + } + } + assertThat(actualRecords, containsInAnyOrder(expectedRecords.toArray())); + } + + @Test + public void timeColumnAsCursor() throws SQLException { + final String tableName = container.getDatabaseName() + ".table_with_time"; + final String cursorColumn = "cursor_column"; + executeQuery("CREATE TABLE " + tableName + "(id INTEGER PRIMARY KEY, " + cursorColumn + " TIME);"); + + final List expectedRecords = new ArrayList<>(); + for (int i = 1; i <= 4; i++) { + final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); + jsonNode.put("id", i); + final LocalTime cursorValue = LocalTime.of(20, i, 0); + jsonNode.put("cursor_column", DateTimeConverter.convertToTime(cursorValue)); + executeQuery("INSERT INTO " + tableName + " VALUES (" + i + ", '" + cursorValue + "');"); + if (i >= 2) { + expectedRecords.add(jsonNode); + } + } + + final List actualRecords = new ArrayList<>(); + try (final Connection connection = container.createConnection("")) { + final PreparedStatement preparedStatement = connection.prepareStatement( + "SELECT * from " + tableName + " WHERE " + cursorColumn + " > ?"); + sqlSourceOperations.setStatementField(preparedStatement, 1, MysqlType.TIME, DateTimeConverter.convertToTime(LocalTime.of(20, 1, 0))); + + try (final ResultSet resultSet = preparedStatement.executeQuery()) { + while (resultSet.next()) { + final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); + for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { + sqlSourceOperations.setJsonField(resultSet, i, jsonNode); + } + actualRecords.add(jsonNode); + } + } + } + assertThat(actualRecords, containsInAnyOrder(expectedRecords.toArray())); + + // Test to check backward compatibility for connectors created before PR https://github.com/airbytehq/airbyte/pull/15504 + actualRecords.clear(); + try (final Connection connection = container.createConnection("")) { + final PreparedStatement preparedStatement = connection.prepareStatement( + "SELECT * from " + tableName + " WHERE " + cursorColumn + " > ?"); + sqlSourceOperations.setStatementField(preparedStatement, 1, MysqlType.TIME, "1970-01-01T20:01:00Z"); + + try (final ResultSet resultSet = preparedStatement.executeQuery()) { + while (resultSet.next()) { + final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); + for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { + sqlSourceOperations.setJsonField(resultSet, i, jsonNode); + } + actualRecords.add(jsonNode); + } + } + } + } + + @Test + public void dateTimeColumnAsCursor() throws SQLException { + final String tableName = container.getDatabaseName() + ".table_with_datetime"; + final String cursorColumn = "cursor_column"; + executeQuery("CREATE TABLE " + tableName + "(id INTEGER PRIMARY KEY, " + cursorColumn + " DATETIME);"); + + final List expectedRecords = new ArrayList<>(); + for (int i = 1; i <= 4; i++) { + final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); + jsonNode.put("id", i); + final LocalDateTime cursorValue = LocalDateTime.of(2019, i, 20, 3, 0, 0); + jsonNode.put("cursor_column", DateTimeConverter.convertToTimestamp(cursorValue)); + executeQuery("INSERT INTO " + tableName + " VALUES (" + i + ", '" + cursorValue + "');"); + if (i >= 2) { + expectedRecords.add(jsonNode); + } + } + + final List actualRecords = new ArrayList<>(); + try (final Connection connection = container.createConnection("")) { + final PreparedStatement preparedStatement = connection.prepareStatement( + "SELECT * from " + tableName + " WHERE " + cursorColumn + " > ?"); + sqlSourceOperations.setStatementField(preparedStatement, 1, MysqlType.DATETIME, + DateTimeConverter.convertToTimestamp(LocalDateTime.of(2019, 1, 20, 3, 0, 0))); + + try (final ResultSet resultSet = preparedStatement.executeQuery()) { + while (resultSet.next()) { + final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); + for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { + sqlSourceOperations.setJsonField(resultSet, i, jsonNode); + } + actualRecords.add(jsonNode); + } + } + } + assertThat(actualRecords, containsInAnyOrder(expectedRecords.toArray())); + + // Test to check backward compatibility for connectors created before PR https://github.com/airbytehq/airbyte/pull/15504 + actualRecords.clear(); + try (final Connection connection = container.createConnection("")) { + final PreparedStatement preparedStatement = connection.prepareStatement( + "SELECT * from " + tableName + " WHERE " + cursorColumn + " > ?"); + sqlSourceOperations.setStatementField(preparedStatement, 1, MysqlType.DATETIME, "2019-01-20T03:00:00.000000Z"); + + try (final ResultSet resultSet = preparedStatement.executeQuery()) { + while (resultSet.next()) { + final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); + for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { + sqlSourceOperations.setJsonField(resultSet, i, jsonNode); + } + actualRecords.add(jsonNode); + } + } + } + assertThat(actualRecords, containsInAnyOrder(expectedRecords.toArray())); + } + + @Test + public void timestampColumnAsCursor() throws SQLException { + final String tableName = container.getDatabaseName() + ".table_with_timestamp"; + final String cursorColumn = "cursor_column"; + executeQuery("CREATE TABLE " + tableName + "(id INTEGER PRIMARY KEY, " + cursorColumn + " timestamp);"); + + final List expectedRecords = new ArrayList<>(); + for (int i = 1; i <= 4; i++) { + final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); + jsonNode.put("id", i); + final Instant cursorValue = Instant.ofEpochSecond(1660298508L).plusSeconds(i - 1); + jsonNode.put("cursor_column", DateTimeConverter.convertToTimestampWithTimezone(cursorValue)); + executeQuery("INSERT INTO " + tableName + " VALUES (" + i + ", '" + Timestamp.from(cursorValue) + "');"); + if (i >= 2) { + expectedRecords.add(jsonNode); + } + } + + final List actualRecords = new ArrayList<>(); + try (final Connection connection = container.createConnection("")) { + final PreparedStatement preparedStatement = connection.prepareStatement( + "SELECT * from " + tableName + " WHERE " + cursorColumn + " > ?"); + sqlSourceOperations.setStatementField(preparedStatement, 1, MysqlType.TIMESTAMP, + DateTimeConverter.convertToTimestampWithTimezone(Instant.ofEpochSecond(1660298508L))); + + try (final ResultSet resultSet = preparedStatement.executeQuery()) { + while (resultSet.next()) { + final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); + for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { + sqlSourceOperations.setJsonField(resultSet, i, jsonNode); + } + actualRecords.add(jsonNode); + } + } + } + + Assertions.assertEquals(3, actualRecords.size()); + + // Test to check backward compatibility for connectors created before PR https://github.com/airbytehq/airbyte/pull/15504 + actualRecords.clear(); + try (final Connection connection = container.createConnection("")) { + final PreparedStatement preparedStatement = connection.prepareStatement( + "SELECT * from " + tableName + " WHERE " + cursorColumn + " > ?"); + sqlSourceOperations.setStatementField(preparedStatement, 1, MysqlType.TIMESTAMP, Instant.ofEpochSecond(1660298508L).toString()); + + try (final ResultSet resultSet = preparedStatement.executeQuery()) { + while (resultSet.next()) { + final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); + for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { + sqlSourceOperations.setJsonField(resultSet, i, jsonNode); + } + actualRecords.add(jsonNode); + } + } + } + Assertions.assertEquals(3, actualRecords.size()); + } + + protected void executeQuery(final String query) { + try { + database.query( + ctx -> ctx + .execute(query)); + } catch (final SQLException e) { + throw new RuntimeException(e); + } + } + +}