Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres Source: add timezone awareness and handle BC dates #13166

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.4.18
dockerImageTag: 0.4.19
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6700,7 +6700,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:0.4.18"
- dockerImage: "airbyte/source-postgres:0.4.19"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import java.sql.Timestamp;
import java.text.ParseException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.chrono.IsoEra;
import java.util.Collections;
import java.util.List;
import java.util.StringJoiner;
Expand Down Expand Up @@ -246,4 +250,27 @@ public String getFullyQualifiedTableNameWithQuoting(final Connection connection,
return schemaName != null ? enquoteIdentifier(connection, schemaName) + "." + quotedTableName : quotedTableName;
}

protected <DateTime> DateTime getDateTimeObject(ResultSet resultSet, int index, Class<DateTime> clazz) throws SQLException {
return resultSet.getObject(index, clazz);
}

protected void putTimeWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
OffsetTime timetz = getDateTimeObject(resultSet, index, OffsetTime.class);
node.put(columnName, timetz.toString());
}

protected void putTimestampWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
OffsetDateTime timestamptz = getDateTimeObject(resultSet, index, OffsetDateTime.class);
LocalDate localDate = timestamptz.toLocalDate();
node.put(columnName, resolveEra(localDate, timestamptz.toString()));
}

protected String resolveEra(LocalDate date, String value) {
return isBCE(date) ? value.substring(1) + " BC" : value;
}

public static boolean isBCE(LocalDate date) {
return date.getEra().equals(IsoEra.BCE);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,10 @@ void testIncrementalStringCheckCursorSpaceInColumnName() throws Exception {

@Test
void testIncrementalTimestampCheckCursor() throws Exception {
incrementalTimestampCheck();
}

protected void incrementalTimestampCheck() throws Exception {
incrementalCursorCheck(
COL_UPDATED_AT,
"2005-10-18T00:00:00Z",
Expand Down Expand Up @@ -611,6 +615,16 @@ void testReadOneTableIncrementallyTwice() throws Exception {

assertEquals(2,
(int) actualMessagesSecondSync.stream().filter(r -> r.getType() == Type.RECORD).count());
final List<AirbyteMessage> expectedMessages = getExpectedAirbyteMessagesSecondSync(namespace);

setEmittedAtToNull(actualMessagesSecondSync);

assertTrue(expectedMessages.size() == actualMessagesSecondSync.size());
assertTrue(expectedMessages.containsAll(actualMessagesSecondSync));
assertTrue(actualMessagesSecondSync.containsAll(expectedMessages));
}

protected List<AirbyteMessage> getExpectedAirbyteMessagesSecondSync(String namespace) {
final List<AirbyteMessage> expectedMessages = new ArrayList<>();
expectedMessages.add(new AirbyteMessage().withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace)
Expand All @@ -634,12 +648,7 @@ void testReadOneTableIncrementallyTwice() throws Exception {
.withStreamNamespace(namespace)
.withCursorField(ImmutableList.of(COL_ID))
.withCursor("5")))))));

setEmittedAtToNull(actualMessagesSecondSync);

assertTrue(expectedMessages.size() == actualMessagesSecondSync.size());
assertTrue(expectedMessages.containsAll(actualMessagesSecondSync));
assertTrue(actualMessagesSecondSync.containsAll(expectedMessages));
return expectedMessages;
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.15
LABEL io.airbyte.version=0.4.16
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,33 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.base.ssh.SshHelpers;
import io.airbyte.integrations.source.jdbc.JdbcSource;
import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest;
import io.airbyte.integrations.source.relationaldb.models.DbState;
import io.airbyte.integrations.source.relationaldb.models.DbStreamState;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -93,6 +109,102 @@ static void cleanUp() {
PSQL_DB.close();
}

@Override
protected List<AirbyteMessage> getTestMessages() {
return Lists.newArrayList(
new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace())
.withData(Jsons.jsonNode(ImmutableMap
.of(COL_ID, ID_VALUE_1,
COL_NAME, "picard",
COL_UPDATED_AT, "2004-10-19")))),
new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace())
.withData(Jsons.jsonNode(ImmutableMap
.of(COL_ID, ID_VALUE_2,
COL_NAME, "crusher",
COL_UPDATED_AT,
"2005-10-19")))),
new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace())
.withData(Jsons.jsonNode(ImmutableMap
.of(COL_ID, ID_VALUE_3,
COL_NAME, "vash",
COL_UPDATED_AT, "2006-10-19")))));
}

@Override
protected AirbyteCatalog getCatalog(final String defaultNamespace) {
return new AirbyteCatalog().withStreams(Lists.newArrayList(
CatalogHelpers.createAirbyteStream(
TABLE_NAME,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_WITHOUT_PK,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(Collections.emptyList()),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_COMPOSITE_PK,
defaultNamespace,
Field.of(COL_FIRST_NAME, JsonSchemaType.STRING),
Field.of(COL_LAST_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))));
}

@Override
protected void incrementalTimestampCheck() throws Exception {
super.incrementalCursorCheck(COL_UPDATED_AT,
"2005-10-18",
"2006-10-19",
Lists.newArrayList(getTestMessages().get(1),
getTestMessages().get(2)));
}

@Override
protected JdbcSourceOperations getSourceOperations() {
return new PostgresSourceOperations();
}

@Override
protected List<AirbyteMessage> getExpectedAirbyteMessagesSecondSync(String namespace) {
final List<AirbyteMessage> expectedMessages = new ArrayList<>();
expectedMessages.add(new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace)
.withData(Jsons.jsonNode(ImmutableMap
.of(COL_ID, ID_VALUE_4,
COL_NAME, "riker",
COL_UPDATED_AT, "2006-10-19")))));
expectedMessages.add(new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace)
.withData(Jsons.jsonNode(ImmutableMap
.of(COL_ID, ID_VALUE_5,
COL_NAME, "data",
COL_UPDATED_AT, "2006-10-19")))));
expectedMessages.add(new AirbyteMessage()
.withType(AirbyteMessage.Type.STATE)
.withState(new AirbyteStateMessage()
.withData(Jsons.jsonNode(new DbState()
.withCdc(false)
.withStreams(Lists.newArrayList(new DbStreamState()
.withStreamName(streamName)
.withStreamNamespace(namespace)
.withCursorField(ImmutableList.of(COL_ID))
.withCursor("5")))))));
return expectedMessages;
}

@Test
void testSpec() throws Exception {
final ConnectorSpecification actual = source.spec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,17 @@
"airbyte_secret": true,
"order": 5
},
"jdbc_url_params": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this change in expected_spec.json is related to this PR. How did it end up here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this change in expected_spec.json is related to this PR. How did it end up here?

Postgres Source Strict encrypt tests were broken in master branch, so i decided to fix them in scope of this PR

"description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).",
"title": "JDBC URL Params",
"type": "string",
"order": 6
},
"replication_method": {
"type": "object",
"title": "Replication Method",
"description": "Replication method to use for extracting data from the database.",
"order": 7,
"order": 8,
"oneOf": [
{
"title": "Standard",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.18
LABEL io.airbyte.version=0.4.19
LABEL io.airbyte.name=airbyte/source-postgres
Loading