Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Postgres Source: fixed unsupported date-time datatypes during incremental sync #13655

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.4.21
dockerImageTag: 0.4.22
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6719,7 +6719,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:0.4.21"
- dockerImage: "airbyte/source-postgres:0.4.22"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,13 @@ void testReadOneColumn() throws Exception {

setEmittedAtToNull(actualMessages);

final List<AirbyteMessage> expectedMessages = getAirbyteMessagesReadOneColumn();
assertTrue(expectedMessages.size() == actualMessages.size());
assertTrue(expectedMessages.containsAll(actualMessages));
assertTrue(actualMessages.containsAll(expectedMessages));
}

protected List<AirbyteMessage> getAirbyteMessagesReadOneColumn() {
final List<AirbyteMessage> expectedMessages = getTestMessages().stream()
.map(Jsons::clone)
.peek(m -> {
Expand All @@ -397,9 +404,7 @@ void testReadOneColumn() throws Exception {
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
})
.collect(Collectors.toList());
assertTrue(expectedMessages.size() == actualMessages.size());
assertTrue(expectedMessages.containsAll(actualMessages));
assertTrue(actualMessages.containsAll(expectedMessages));
return expectedMessages;
}

@Test
Expand Down Expand Up @@ -432,17 +437,7 @@ void testReadMultipleTables() throws Exception {
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_NAME, JsonSchemaType.STRING)));

final List<AirbyteMessage> secondStreamExpectedMessages = getTestMessages()
.stream()
.map(Jsons::clone)
.peek(m -> {
m.getRecord().setStream(streamName2);
m.getRecord().setNamespace(getDefaultNamespace());
((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT);
((ObjectNode) m.getRecord().getData()).replace(COL_ID,
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
})
.collect(Collectors.toList());
final List<AirbyteMessage> secondStreamExpectedMessages = getAirbyteMessagesSecondSync(streamName2);
expectedMessages.addAll(secondStreamExpectedMessages);
}

Expand All @@ -456,6 +451,21 @@ void testReadMultipleTables() throws Exception {
assertTrue(actualMessages.containsAll(expectedMessages));
}

protected List<AirbyteMessage> getAirbyteMessagesSecondSync(String streamName2) {
return getTestMessages()
.stream()
.map(Jsons::clone)
.peek(m -> {
m.getRecord().setStream(streamName2);
m.getRecord().setNamespace(getDefaultNamespace());
((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT);
((ObjectNode) m.getRecord().getData()).replace(COL_ID,
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
})
.collect(Collectors.toList());

}

@Test
void testTablesWithQuoting() throws Exception {
final ConfiguredAirbyteStream streamForTableWithSpaces = createTableWithSpaces();
Expand All @@ -469,7 +479,17 @@ void testTablesWithQuoting() throws Exception {

setEmittedAtToNull(actualMessages);

final List<AirbyteMessage> secondStreamExpectedMessages = getTestMessages()
final List<AirbyteMessage> secondStreamExpectedMessages = getAirbyteMessagesForTablesWithQuoting(streamForTableWithSpaces);
final List<AirbyteMessage> expectedMessages = new ArrayList<>(getTestMessages());
expectedMessages.addAll(secondStreamExpectedMessages);

assertTrue(expectedMessages.size() == actualMessages.size());
assertTrue(expectedMessages.containsAll(actualMessages));
assertTrue(actualMessages.containsAll(expectedMessages));
}

protected List<AirbyteMessage> getAirbyteMessagesForTablesWithQuoting(ConfiguredAirbyteStream streamForTableWithSpaces) {
return getTestMessages()
.stream()
.map(Jsons::clone)
.peek(m -> {
Expand All @@ -481,12 +501,6 @@ void testTablesWithQuoting() throws Exception {
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
})
.collect(Collectors.toList());
final List<AirbyteMessage> expectedMessages = new ArrayList<>(getTestMessages());
expectedMessages.addAll(secondStreamExpectedMessages);

assertTrue(expectedMessages.size() == actualMessages.size());
assertTrue(expectedMessages.containsAll(actualMessages));
assertTrue(actualMessages.containsAll(expectedMessages));
}

@SuppressWarnings("ResultOfMethodCallIgnored")
Expand Down Expand Up @@ -532,6 +546,17 @@ void testIncrementalStringCheckCursor() throws Exception {
void testIncrementalStringCheckCursorSpaceInColumnName() throws Exception {
final ConfiguredAirbyteStream streamWithSpaces = createTableWithSpaces();

final ArrayList<AirbyteMessage> expectedRecordMessages = getAirbyteMessagesCheckCursorSpaceInColumnName(streamWithSpaces);
incrementalCursorCheck(
COL_LAST_NAME_WITH_SPACE,
COL_LAST_NAME_WITH_SPACE,
"patent",
"vash",
expectedRecordMessages,
streamWithSpaces);
}

protected ArrayList<AirbyteMessage> getAirbyteMessagesCheckCursorSpaceInColumnName(ConfiguredAirbyteStream streamWithSpaces) {
final AirbyteMessage firstMessage = getTestMessages().get(0);
firstMessage.getRecord().setStream(streamWithSpaces.getStream().getName());
((ObjectNode) firstMessage.getRecord().getData()).remove(COL_UPDATED_AT);
Expand All @@ -546,21 +571,15 @@ void testIncrementalStringCheckCursorSpaceInColumnName() throws Exception {

Lists.newArrayList(getTestMessages().get(0), getTestMessages().get(2));

incrementalCursorCheck(
COL_LAST_NAME_WITH_SPACE,
COL_LAST_NAME_WITH_SPACE,
"patent",
"vash",
Lists.newArrayList(firstMessage, secondMessage),
streamWithSpaces);
return Lists.newArrayList(firstMessage, secondMessage);
}

@Test
void testIncrementalTimestampCheckCursor() throws Exception {
incrementalTimestampCheck();
void testIncrementalDateCheckCursor() throws Exception {
incrementalDateCheck();
}

protected void incrementalTimestampCheck() throws Exception {
protected void incrementalDateCheck() throws Exception {
incrementalCursorCheck(
COL_UPDATED_AT,
"2005-10-18T00:00:00Z",
Expand Down Expand Up @@ -600,14 +619,7 @@ void testReadOneTableIncrementallyTwice() throws Exception {
.filter(r -> r.getType() == Type.STATE).findFirst();
assertTrue(stateAfterFirstSyncOptional.isPresent());

database.execute(connection -> {
connection.createStatement().execute(
String.format("INSERT INTO %s(id, name, updated_at) VALUES (4,'riker', '2006-10-19')",
getFullyQualifiedTableName(TABLE_NAME)));
connection.createStatement().execute(
String.format("INSERT INTO %s(id, name, updated_at) VALUES (5, 'data', '2006-10-19')",
getFullyQualifiedTableName(TABLE_NAME)));
});
executeStatementReadIncrementallyTwice();

final List<AirbyteMessage> actualMessagesSecondSync = MoreIterators
.toList(source.read(config, configuredCatalog,
Expand All @@ -624,6 +636,17 @@ void testReadOneTableIncrementallyTwice() throws Exception {
assertTrue(actualMessagesSecondSync.containsAll(expectedMessages));
}

protected void executeStatementReadIncrementallyTwice() throws SQLException {
database.execute(connection -> {
connection.createStatement().execute(
String.format("INSERT INTO %s(id, name, updated_at) VALUES (4,'riker', '2006-10-19')",
getFullyQualifiedTableName(TABLE_NAME)));
connection.createStatement().execute(
String.format("INSERT INTO %s(id, name, updated_at) VALUES (5, 'data', '2006-10-19')",
getFullyQualifiedTableName(TABLE_NAME)));
});
}

protected List<AirbyteMessage> getExpectedAirbyteMessagesSecondSync(String namespace) {
final List<AirbyteMessage> expectedMessages = new ArrayList<>();
expectedMessages.add(new AirbyteMessage().withType(Type.RECORD)
Expand Down Expand Up @@ -696,16 +719,7 @@ void testReadMultipleTablesIncrementally() throws Exception {

// we know the second streams messages are the same as the first minus the updated at column. so we
// cheat and generate the expected messages off of the first expected messages.
final List<AirbyteMessage> secondStreamExpectedMessages = getTestMessages()
.stream()
.map(Jsons::clone)
.peek(m -> {
m.getRecord().setStream(streamName2);
((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT);
((ObjectNode) m.getRecord().getData()).replace(COL_ID,
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
})
.collect(Collectors.toList());
final List<AirbyteMessage> secondStreamExpectedMessages = getAirbyteMessagesSecondStreamWithNamespace(streamName2);
final List<AirbyteMessage> expectedMessagesFirstSync = new ArrayList<>(getTestMessages());
expectedMessagesFirstSync.add(new AirbyteMessage()
.withType(Type.STATE)
Expand Down Expand Up @@ -748,6 +762,19 @@ void testReadMultipleTablesIncrementally() throws Exception {
assertTrue(actualMessagesFirstSync.containsAll(expectedMessagesFirstSync));
}

protected List<AirbyteMessage> getAirbyteMessagesSecondStreamWithNamespace(String streamName2) {
return getTestMessages()
.stream()
.map(Jsons::clone)
.peek(m -> {
m.getRecord().setStream(streamName2);
((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT);
((ObjectNode) m.getRecord().getData()).replace(COL_ID,
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
})
.collect(Collectors.toList());
}

// when initial and final cursor fields are the same.
protected void incrementalCursorCheck(
final String cursorField,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) {
}

@Override
protected void incrementalTimestampCheck() throws Exception {
protected void incrementalDateCheck() throws Exception {
super.incrementalCursorCheck(COL_UPDATED_AT,
"2005-10-18",
"2006-10-19",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.21
LABEL io.airbyte.version=0.4.22
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.protocol.models.JsonSchemaType;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand All @@ -30,6 +29,8 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.util.Collections;
import org.postgresql.jdbc.PgResultSetMetaData;
import org.slf4j.Logger;
Expand Down Expand Up @@ -79,15 +80,57 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
}

@Override
protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
try {
Date date = Date.valueOf(value);
preparedStatement.setDate(parameterIndex, date);
} catch (final Exception e) {
throw new RuntimeException(e);
public void setStatementField(final PreparedStatement preparedStatement,
final int parameterIndex,
final JDBCType cursorFieldType,
final String value)
throws SQLException {
switch (cursorFieldType) {

case TIMESTAMP -> setTimestamp(preparedStatement, parameterIndex, value);
case TIMESTAMP_WITH_TIMEZONE -> setTimestampWithTimezone(preparedStatement, parameterIndex, value);
case TIME -> setTime(preparedStatement, parameterIndex, value);
case TIME_WITH_TIMEZONE -> setTimeWithTimezone(preparedStatement, parameterIndex, value);
case DATE -> setDate(preparedStatement, parameterIndex, value);
case BIT -> setBit(preparedStatement, parameterIndex, value);
case BOOLEAN -> setBoolean(preparedStatement, parameterIndex, value);
case TINYINT, SMALLINT -> setShortInt(preparedStatement, parameterIndex, value);
case INTEGER -> setInteger(preparedStatement, parameterIndex, value);
case BIGINT -> setBigInteger(preparedStatement, parameterIndex, value);
case FLOAT, DOUBLE -> setDouble(preparedStatement, parameterIndex, value);
case REAL -> setReal(preparedStatement, parameterIndex, value);
case NUMERIC, DECIMAL -> setDecimal(preparedStatement, parameterIndex, value);
case CHAR, NCHAR, NVARCHAR, VARCHAR, LONGVARCHAR -> setString(preparedStatement, parameterIndex, value);
case BINARY, BLOB -> setBinary(preparedStatement, parameterIndex, value);
// since cursor are expected to be comparable, handle cursor typing strictly and error on
// unrecognized types
default -> throw new IllegalArgumentException(String.format("%s is not supported.", cursorFieldType));
}
}

private void setTimeWithTimezone(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
preparedStatement.setObject(parameterIndex, OffsetTime.parse(value));
}

private void setTimestampWithTimezone(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value));
}

@Override
protected void setTimestamp(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value));
}

@Override
protected void setTime(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
preparedStatement.setObject(parameterIndex, LocalTime.parse(value));
}

@Override
protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
preparedStatement.setObject(parameterIndex, LocalDate.parse(value));
}

@Override
public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException {
final PgResultSetMetaData metadata = (PgResultSetMetaData) resultSet.getMetaData();
Expand Down
Loading