Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Fix data type tests in CdcPostgresSourceDatatypeTest #7339

Merged
merged 22 commits into from
Nov 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
ff3a9f6
Fix data type tests in CdcPostgresSourceComprehensiveTest
sashaNeshcheret Oct 25, 2021
8d3373f
update style format
sashaNeshcheret Oct 25, 2021
6b13bc4
bump version for postgres source
sashaNeshcheret Oct 26, 2021
3939003
bump version for postgres source in json definition
sashaNeshcheret Oct 26, 2021
9136f21
remove unnecessary comments from test and bump version for postgres s…
sashaNeshcheret Oct 26, 2021
6627c7a
Merge branch 'master' into oneshcheret/5382-fix-data-type-cdc-postgres
sashaNeshcheret Oct 26, 2021
e7e9a63
resolved potential conflicts with debezium utils in mssql converter i…
sashaNeshcheret Oct 26, 2021
8a6eba4
Merge remote-tracking branch 'origin/master' into oneshcheret/5382-fi…
sashaNeshcheret Oct 26, 2021
aeef608
Merge remote-tracking branch 'origin/oneshcheret/5382-fix-data-type-c…
sashaNeshcheret Oct 26, 2021
83f8bdc
resolved potential conflicts with debezium utils in mssql converter i…
sashaNeshcheret Oct 26, 2021
40ab420
Update notes for money type in postgres.md
sashaNeshcheret Oct 27, 2021
296bc6f
Update docs/integrations/sources/postgres.md
sashaNeshcheret Oct 27, 2021
34e51e5
added test cases for converting data values for postgres cdc, remove …
sashaNeshcheret Oct 27, 2021
f25fd54
remove redundant void message from test
sashaNeshcheret Oct 27, 2021
b5107f6
Merge remote-tracking branch 'origin/master' into oneshcheret/5382-fi…
sashaNeshcheret Oct 28, 2021
b1c592e
update style format
sashaNeshcheret Oct 28, 2021
2857a1a
fix time zone in DebeziumConverterUtilsTest
sashaNeshcheret Oct 28, 2021
4c092ff
set utc time zone in DataTypeUtils
sashaNeshcheret Oct 28, 2021
6f059a3
Merge remote-tracking branch 'origin/master' into oneshcheret/5382-fi…
sashaNeshcheret Oct 30, 2021
6a1906e
set utc time zone for date format
sashaNeshcheret Oct 30, 2021
019a724
revert changes regarding timezone in date format, disable tests with …
sashaNeshcheret Nov 2, 2021
941102c
Merge remote-tracking branch 'origin/master' into oneshcheret/5382-fi…
sashaNeshcheret Nov 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
"name": "Postgres",
"dockerRepository": "airbyte/source-postgres",
"dockerImageTag": "0.3.9",
"dockerImageTag": "0.3.13",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/postgres",
"icon": "postgresql.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.3.11
dockerImageTag: 0.3.13
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.debezium.internals;

import io.airbyte.db.DataTypeUtils;
import io.debezium.spi.converter.RelationalColumn;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class DebeziumConverterUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumConverterUtils.class);

private DebeziumConverterUtils() {
throw new UnsupportedOperationException();
}

public static String convertDate(final Object input) {
/**
* While building this custom converter we were not sure what type debezium could return cause there
* is no mention of it in the documentation. Secondly if you take a look at
* {@link io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter#converterFor(io.debezium.spi.converter.RelationalColumn, io.debezium.spi.converter.CustomConverter.ConverterRegistration)}
* method, even it is handling multiple data types but its not clear under what circumstances which
* data type would be returned. I just went ahead and handled the data types that made sense.
* Secondly, we use LocalDateTime to handle this cause it represents DATETIME datatype in JAVA
*/
if (input instanceof LocalDateTime) {
return DataTypeUtils.toISO8601String((LocalDateTime) input);
} else if (input instanceof LocalDate) {
return DataTypeUtils.toISO8601String((LocalDate) input);
} else if (input instanceof Duration) {
return DataTypeUtils.toISO8601String((Duration) input);
} else if (input instanceof Timestamp) {
return DataTypeUtils.toISO8601String(((Timestamp) input).toLocalDateTime());
} else if (input instanceof Number) {
return DataTypeUtils.toISO8601String(
new Timestamp(((Number) input).longValue()).toLocalDateTime());
} else if (input instanceof String) {
try {
return LocalDateTime.parse((String) input).toString();
} catch (final DateTimeParseException e) {
LOGGER.warn("Cannot convert value '{}' to LocalDateTime type", input);
return input.toString();
}
}
LOGGER.warn("Uncovered date class type '{}'. Use default converter", input.getClass().getName());
return input.toString();
}

public static Object convertDefaultValue(RelationalColumn field) {
if (field.isOptional()) {
return null;
} else if (field.hasDefaultValue()) {
return field.defaultValue();
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,8 @@

package io.airbyte.integrations.debezium.internals;

import io.airbyte.db.DataTypeUtils;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeParseException;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.connect.data.SchemaBuilder;
Expand Down Expand Up @@ -61,50 +55,15 @@ private void registerText(final RelationalColumn field, final ConverterRegistrat

if (x instanceof byte[]) {
return new String((byte[]) x);
} else
} else {
return x.toString();
}
});
}

private void registerDate(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string(), x -> {
if (x == null) {
if (field.isOptional()) {
return null;
} else if (field.hasDefaultValue()) {
return field.defaultValue();
}
return null;
}
/**
* While building this custom converter we were not sure what type debezium could return cause there
* is no mention of it in the documentation. Secondly if you take a look at
* {@link io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter#converterFor(RelationalColumn, ConverterRegistration)}
* method, even it is handling multiple data types but its not clear under what circumstances which
* data type would be returned. I just went ahead and handled the data types that made sense.
* Secondly, we use LocalDateTime to handle this cause it represents DATETIME datatype in JAVA
*/
if (x instanceof LocalDateTime) {
return DataTypeUtils.toISO8601String((LocalDateTime) x);
} else if (x instanceof LocalDate) {
return DataTypeUtils.toISO8601String((LocalDate) x);
} else if (x instanceof Duration) {
return DataTypeUtils.toISO8601String((Duration) x);
} else if (x instanceof Timestamp) {
return DataTypeUtils.toISO8601String(((Timestamp) x).toLocalDateTime());
} else if (x instanceof Number) {
return DataTypeUtils.toISO8601String(new Timestamp(((Number) x).longValue()).toLocalDateTime());
} else if (x instanceof String) {
try {
return LocalDateTime.parse((String) x).toString();
} catch (final DateTimeParseException e) {
LOGGER.warn("Cannot convert value '{}' to LocalDateTime type", x);
return x.toString();
}
}
LOGGER.warn("Uncovered date class type '{}'. Use default converter", x.getClass().getName());
return x.toString();
});
registration.register(SchemaBuilder.string(),
x -> x == null ? DebeziumConverterUtils.convertDefaultValue(field) : DebeziumConverterUtils.convertDate(x));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.debezium.internals;

import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.postgresql.util.PGInterval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresConverter.class);

private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP"};
private final String[] BIT_TYPES = {"BIT", "VARBIT"};
private final String[] MONEY_ITEM_TYPE = {"MONEY"};
private final String[] GEOMETRICS_TYPES = {"BOX", "CIRCLE", "LINE", "LSEG", "POINT", "POLYGON", "PATH"};
private final String[] TEXT_TYPES = {"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR"};

@Override
public void configure(Properties props) {}

@Override
public void converterFor(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
if (Arrays.stream(DATE_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerDate(field, registration);
} else if (Arrays.stream(TEXT_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))
|| Arrays.stream(GEOMETRICS_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))
|| Arrays.stream(BIT_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerText(field, registration);
} else if (Arrays.stream(MONEY_ITEM_TYPE).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerMoney(field, registration);
}
}

private void registerText(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
}

if (x instanceof byte[]) {
return new String((byte[]) x);
} else {
return x.toString();
}
});
}

private void registerDate(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
} else if (x instanceof PGInterval) {
return convertInterval((PGInterval) x);
} else {
return DebeziumConverterUtils.convertDate(x);
}
});
}

private String convertInterval(PGInterval pgInterval) {
StringBuilder resultInterval = new StringBuilder();
formatDateUnit(resultInterval, pgInterval.getYears(), " year ");
formatDateUnit(resultInterval, pgInterval.getMonths(), " mons ");
formatDateUnit(resultInterval, pgInterval.getDays(), " days ");

formatTimeValues(resultInterval, pgInterval);
return resultInterval.toString();
}

private void registerMoney(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
} else if (x instanceof Double) {
BigDecimal result = BigDecimal.valueOf((Double) x);
if (result.compareTo(new BigDecimal("999999999999999")) == 1
|| result.compareTo(new BigDecimal("-999999999999999")) == -1) {
return null;
}
return result.toString();
} else {
return x.toString();
}
});
}

private void formatDateUnit(StringBuilder resultInterval, int dateUnit, String s) {
if (dateUnit != 0) {
resultInterval
.append(dateUnit)
.append(s);
}
}

private void formatTimeValues(StringBuilder resultInterval, PGInterval pgInterval) {
if (isNegativeTime(pgInterval)) {
resultInterval.append("-");
}
// TODO check if value more or less than Integer.MIN_VALUE Integer.MAX_VALUE,
int hours = Math.abs(pgInterval.getHours());
int minutes = Math.abs(pgInterval.getMinutes());
int seconds = Math.abs(pgInterval.getWholeSeconds());
resultInterval.append(addFirstDigit(hours));
resultInterval.append(hours);
resultInterval.append(":");
resultInterval.append(addFirstDigit(minutes));
resultInterval.append(minutes);
resultInterval.append(":");
resultInterval.append(addFirstDigit(seconds));
resultInterval.append(seconds);
}

private String addFirstDigit(int hours) {
return hours <= 9 ? "0" : "";
}

private boolean isNegativeTime(PGInterval pgInterval) {
return pgInterval.getHours() < 0
|| pgInterval.getMinutes() < 0
|| pgInterval.getWholeSeconds() < 0;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.debezium.internals;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.debezium.spi.converter.RelationalColumn;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

class DebeziumConverterUtilsTest {

@Test
public void convertDefaultValueTest() {

RelationalColumn relationalColumn = mock(RelationalColumn.class);

when(relationalColumn.isOptional()).thenReturn(true);
Object actualColumnDefaultValue = DebeziumConverterUtils.convertDefaultValue(relationalColumn);
Assertions.assertNull(actualColumnDefaultValue, "Default value for optional relational column should be null");

when(relationalColumn.isOptional()).thenReturn(false);
when(relationalColumn.hasDefaultValue()).thenReturn(false);
actualColumnDefaultValue = DebeziumConverterUtils.convertDefaultValue(relationalColumn);
Assertions.assertNull(actualColumnDefaultValue);

when(relationalColumn.isOptional()).thenReturn(false);
when(relationalColumn.hasDefaultValue()).thenReturn(true);
String expectedColumnDefaultValue = "default value";
when(relationalColumn.defaultValue()).thenReturn(expectedColumnDefaultValue);
actualColumnDefaultValue = DebeziumConverterUtils.convertDefaultValue(relationalColumn);
Assertions.assertEquals(actualColumnDefaultValue, expectedColumnDefaultValue);
}

@Test
public void convertLocalDate() {
LocalDate localDate = LocalDate.of(2021, 1, 1);

String actual = DebeziumConverterUtils.convertDate(localDate);
Assertions.assertEquals("2021-01-01T00:00:00Z", actual);
}

@Test
public void convertTLocalTime() {
LocalTime localTime = LocalTime.of(8, 1, 1);
String actual = DebeziumConverterUtils.convertDate(localTime);
Assertions.assertEquals("08:01:01", actual);
}

@Test
public void convertLocalDateTime() {
LocalDateTime localDateTime = LocalDateTime.of(2021, 1, 1, 8, 1, 1);

String actual = DebeziumConverterUtils.convertDate(localDateTime);
Assertions.assertEquals("2021-01-01T08:01:01Z", actual);
}

@Test
@Disabled
public void convertDuration() {
Duration duration = Duration.ofHours(100_000);

String actual = DebeziumConverterUtils.convertDate(duration);
Assertions.assertEquals("1981-05-29T20:00:00Z", actual);
}

@Test
public void convertTimestamp() {
LocalDateTime localDateTime = LocalDateTime.of(2021, 1, 1, 8, 1, 1);
Timestamp timestamp = Timestamp.valueOf(localDateTime);

String actual = DebeziumConverterUtils.convertDate(timestamp);
Assertions.assertEquals("2021-01-01T08:01:01Z", actual);
}

@Test
@Disabled
public void convertNumber() {
Number number = 100_000;

String actual = DebeziumConverterUtils.convertDate(number);
Assertions.assertEquals("1970-01-01T03:01:40Z", actual);
}

@Test
public void convertStringDateFormat() {
String stringValue = "2021-01-01T00:00:00Z";

String actual = DebeziumConverterUtils.convertDate(stringValue);
Assertions.assertEquals("2021-01-01T00:00:00Z", actual);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
Loading