Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix temporal data type handling in mysql source #15504

Merged
merged 18 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
package io.airbyte.integrations.debezium.internals;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.db.DataTypeUtils;
import io.airbyte.db.jdbc.DateTimeConverter;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import java.time.LocalDate;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
Expand All @@ -30,7 +30,7 @@ public class MySQLDateTimeConverter implements CustomConverter<SchemaBuilder, Re

private static final Logger LOGGER = LoggerFactory.getLogger(MySQLDateTimeConverter.class);

private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME"};
private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMESTAMP"};

@Override
public void configure(final Properties props) {}
Expand All @@ -42,18 +42,22 @@ public void converterFor(final RelationalColumn field, final ConverterRegistrati
}
}

/**
* The debezium driver replaces Zero-value by Null even when this column is mandatory. According to
* the doc, it should be done by driver, but it fails.
*/
private Object convertDefaultValueNullDate(final RelationalColumn field) {
final var defaultValue = DebeziumConverterUtils.convertDefaultValue(field);
return (defaultValue == null && !field.isOptional() ? DataTypeUtils.toISO8601String(LocalDate.EPOCH) : defaultValue);
}

private void registerDate(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string(),
x -> x == null ? convertDefaultValueNullDate(field) : DebeziumConverterUtils.convertDate(x));
final var fieldType = field.typeName();

registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
}

return switch (fieldType.toUpperCase(Locale.ROOT)) {
case "DATETIME" -> DateTimeConverter.convertToTimestamp(x);
case "DATE" -> DateTimeConverter.convertToDate(x);
case "TIME" -> DateTimeConverter.convertToTime(x);
case "TIMESTAMP" -> DateTimeConverter.convertToTimestampWithTimezone(x);
default -> throw new IllegalArgumentException("Unknown field type " + fieldType.toUpperCase(Locale.ROOT));
};
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies {
testImplementation testFixtures(project(':airbyte-integrations:bases:debezium-v1-9-2'))
testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))
testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation libs.connectors.testcontainers.mysql

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

public class MySqlCdcProperties {

static Properties getDebeziumProperties(JsonNode config) {
static Properties getDebeziumProperties(final JsonNode config) {
final Properties props = new Properties();

// debezium engine configuration
Expand All @@ -26,8 +26,13 @@ static Properties getDebeziumProperties(JsonNode config) {
props.setProperty("datetime.type", "io.airbyte.integrations.debezium.internals.MySQLDateTimeConverter");

// snapshot config
// https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-mode
props.setProperty("snapshot.mode", "when_needed");
if (config.has("snapshot_mode")) {
//The parameter `snapshot_mode` is passed in test to simulate reading the binlog directly and skip initial snapshot
props.setProperty("snapshot.mode", config.get("snapshot_mode").asText());
} else {
// https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-mode
props.setProperty("snapshot.mode", "when_needed");
}
// https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-locking-mode
// This is to make sure other database clients are allowed to write to a table while Airbyte is
// taking a snapshot. There is a risk involved that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,17 @@
import io.airbyte.db.DataTypeUtils;
import io.airbyte.db.SourceOperations;
import io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations;
import io.airbyte.db.jdbc.DateTimeConverter;
import io.airbyte.protocol.models.JsonSchemaType;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.format.DateTimeParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -73,7 +80,8 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob
case DOUBLE, DOUBLE_UNSIGNED -> putDouble(json, columnName, resultSet, colIndex);
case DECIMAL, DECIMAL_UNSIGNED -> putBigDecimal(json, columnName, resultSet, colIndex);
case DATE -> putDate(json, columnName, resultSet, colIndex);
case DATETIME, TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex);
case DATETIME -> putTimestamp(json, columnName, resultSet, colIndex);
case TIMESTAMP -> putTimestampWithTimezone(json, columnName, resultSet, colIndex);
case TIME -> putTime(json, columnName, resultSet, colIndex);
// The returned year value can either be a java.sql.Short (when yearIsDateType=false)
// or a java.sql.Date with the date set to January 1st, at midnight (when yearIsDateType=true).
Expand Down Expand Up @@ -125,7 +133,8 @@ public void setStatementField(final PreparedStatement preparedStatement,
case FLOAT, FLOAT_UNSIGNED, DOUBLE, DOUBLE_UNSIGNED -> setDouble(preparedStatement, parameterIndex, value);
case DECIMAL, DECIMAL_UNSIGNED -> setDecimal(preparedStatement, parameterIndex, value);
case DATE -> setDate(preparedStatement, parameterIndex, value);
case DATETIME, TIMESTAMP -> setTimestamp(preparedStatement, parameterIndex, value);
case DATETIME -> setTimestamp(preparedStatement, parameterIndex, value);
case TIMESTAMP -> setTimestampWithTimezone(preparedStatement, parameterIndex, value);
case TIME -> setTime(preparedStatement, parameterIndex, value);
case YEAR, CHAR, VARCHAR, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT, ENUM, SET -> setString(preparedStatement, parameterIndex, value);
case TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY -> setBinary(preparedStatement, parameterIndex, value);
Expand Down Expand Up @@ -168,7 +177,22 @@ public MysqlType getFieldType(final JsonNode field) {
}

@Override
public JsonSchemaType getJsonType(MysqlType mysqlType) {
protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, DateTimeConverter.convertToDate(getObject(resultSet, index, LocalDate.class)));
}

@Override
protected void putTime(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, DateTimeConverter.convertToTime(getObject(resultSet, index, LocalTime.class)));
}

@Override
protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, DateTimeConverter.convertToTimestamp(getObject(resultSet, index, LocalDateTime.class)));
}

@Override
public JsonSchemaType getJsonType(final MysqlType mysqlType) {
return switch (mysqlType) {
case
// TINYINT(1) is boolean, but it should have been converted to MysqlType.BOOLEAN in {@link
Expand All @@ -179,8 +203,54 @@ public JsonSchemaType getJsonType(MysqlType mysqlType) {
case NULL -> JsonSchemaType.NULL;
// BIT(1) is boolean, but it should have been converted to MysqlType.BOOLEAN in {@link getFieldType}
case BIT, TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY, GEOMETRY -> JsonSchemaType.STRING_BASE_64;
case TIME -> JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE;
case DATETIME -> JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE;
case TIMESTAMP -> JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE;
case DATE -> JsonSchemaType.STRING_DATE;
default -> JsonSchemaType.STRING;
};
}

@Override
protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
try {
preparedStatement.setObject(parameterIndex, LocalDate.parse(value));
} catch (final DateTimeParseException e) {
// This is just for backward compatibility for connectors created on versions before PR https://github.com/airbytehq/airbyte/pull/15504
LOGGER.warn("Exception occurred while trying to parse value for date column the new way, trying the old way", e);
super.setDate(preparedStatement, parameterIndex, value);
}
}

@Override
protected void setTimestamp(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
try {
preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value));
} catch (final DateTimeParseException e) {
// This is just for backward compatibility for connectors created on versions before PR https://github.com/airbytehq/airbyte/pull/15504
LOGGER.warn("Exception occurred while trying to parse value for datetime column the new way, trying the old way", e);
preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value));
}
}

private void setTimestampWithTimezone(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
try {
preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value));
} catch (final DateTimeParseException e) {
// This is just for backward compatibility for connectors created on versions before PR https://github.com/airbytehq/airbyte/pull/15504
LOGGER.warn("Exception occurred while trying to parse value for timestamp column the new way, trying the old way", e);
preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value));
}
}

@Override
protected void setTime(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
try {
preparedStatement.setObject(parameterIndex, LocalTime.parse(value));
} catch (final DateTimeParseException e) {
LOGGER.warn("Exception occurred while trying to parse value for time column the new way, trying the old way", e);
// This is just for backward compatibility for connectors created on versions before PR https://github.com/airbytehq/airbyte/pull/15504
super.setTime(preparedStatement, parameterIndex, value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,34 +235,28 @@ protected void initTests() {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("date")
.airbyteType(JsonSchemaType.STRING)
.airbyteType(JsonSchemaType.STRING_DATE)
.addInsertValues("null", "'2021-01-01'")
.addExpectedValues(null, "2021-01-01T00:00:00Z")
.addExpectedValues(null, "2021-01-01")
.build());

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("datetime")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("null", "'2005-10-10 23:22:21'")
.addExpectedValues(null, "2005-10-10T23:22:21.000000Z")
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)
.addInsertValues("null", "'2005-10-10 23:22:21'", "'2013-09-05T10:10:02'", "'2013-09-06T10:10:02'")
.addExpectedValues(null, "2005-10-10T23:22:21.000000", "2013-09-05T10:10:02.000000", "2013-09-06T10:10:02.000000")
.build());

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("timestamp")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("null", "'2021-01-00'", "'2021-00-00'", "'0000-00-00'")
.addExpectedValues(null, null, null, null)
.build());
addTimestampDataTypeTest();

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("time")
.airbyteType(JsonSchemaType.STRING)
.airbyteType(JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE)
// JDBC driver can process only "clock"(00:00:00-23:59:59) values.
.addInsertValues("null", "'-23:59:59'", "'00:00:00'")
.addExpectedValues(null, "1970-01-01T23:59:59Z", "1970-01-01T00:00:00Z")
.addInsertValues("null", "'-22:59:59'", "'23:59:59'", "'00:00:00'")
.addExpectedValues(null, "22:59:59.000000", "23:59:59.000000", "00:00:00.000000")
.build());

addDataTypeTestData(
Expand Down Expand Up @@ -384,13 +378,7 @@ protected void initTests() {
.addExpectedValues(StringUtils.leftPad("0", 1048000, "0"), "test")
.build());

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("json")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("null", "'{\"a\": 10, \"b\": 15}'", "'{\"fóo\": \"bär\"}'", "'{\"春江潮水连海平\": \"海上明月共潮生\"}'")
.addExpectedValues(null, "{\"a\": 10, \"b\": 15}", "{\"fóo\": \"bär\"}", "{\"春江潮水连海平\": \"海上明月共潮生\"}")
.build());
addJsonDataTypeTest();

addDataTypeTestData(
TestDataHolder.builder()
Expand All @@ -412,6 +400,26 @@ protected void initTests() {

}

protected void addJsonDataTypeTest() {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("json")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("null", "'{\"a\": 10, \"b\": 15}'", "'{\"fóo\": \"bär\"}'", "'{\"春江潮水连海平\": \"海上明月共潮生\"}'")
.addExpectedValues(null, "{\"a\": 10, \"b\": 15}", "{\"fóo\": \"bär\"}", "{\"春江潮水连海平\": \"海上明月共潮生\"}")
.build());
}

protected void addTimestampDataTypeTest() {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("timestamp")
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE)
.addInsertValues("null", "'2021-01-00'", "'2021-00-00'", "'0000-00-00'", "'2022-08-09T10:17:16.161342Z'")
.addExpectedValues(null, null, null, null, "2022-08-09T10:17:16.000000Z")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are these returning null? is it because the insert values aren't valid timestamps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup

.build());
}

private String getLogString(final int length) {
final int maxLpadLength = 262144;
final StringBuilder stringBuilder = new StringBuilder("concat(");
Expand Down
Loading