Skip to content

Commit

Permalink
Destination Postgres : Enable DAT and fix the data fetch. (#12543)
Browse files Browse the repository at this point in the history
* Enable DAT for Postgres and fix the data fetch.
Move JDBC abstract part for tests to the JdbcDestinationAcceptanceTest.java

* Remove unnecessary deserialization + add jsonb to json transformation.

* Remove unnecessary deserialization from ssh
  • Loading branch information
DoNotPanicUA authored and suhomud committed May 23, 2022
1 parent 38390e5 commit 781be94
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ plugins {
id 'java-library'
}
dependencies {
implementation project(':airbyte-db:lib')
implementation project(':airbyte-config:models')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-protocol:models')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.standardtest.destination;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import java.util.Arrays;
import org.jooq.Record;

public abstract class JdbcDestinationAcceptanceTest extends DestinationAcceptanceTest {

protected final ObjectMapper mapper = new ObjectMapper();

protected JsonNode getJsonFromRecord(Record record) {
ObjectNode node = mapper.createObjectNode();

Arrays.stream(record.fields()).forEach(field -> {
var value = record.get(field);

switch (field.getDataType().getTypeName()) {
case "varchar", "jsonb", "other":
var stringValue = (value != null ? value.toString() : null);
if (stringValue != null && (stringValue.replaceAll("[^\\x00-\\x7F]", "").matches("^\\[.*\\]$")
|| stringValue.replaceAll("[^\\x00-\\x7F]", "").matches("^\\{.*\\}$"))) {
node.set(field.getName(), Jsons.deserialize(stringValue));
} else {
node.put(field.getName(), stringValue);
}
break;
default:
node.put(field.getName(), (value != null ? value.toString() : null));
}
});
return node;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,16 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.testcontainers.containers.PostgreSQLContainer;

public class PostgresDestinationAcceptanceTest extends DestinationAcceptanceTest {
public class PostgresDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest {

private PostgreSQLContainer<?> db;
private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();
Expand Down Expand Up @@ -62,7 +61,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace)
.stream()
.map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText()))
.map(r -> r.get(JavaBaseConstants.COLUMN_NAME_DATA))
.collect(Collectors.toList());
}

Expand All @@ -81,41 +80,42 @@ protected boolean implementsNamespaces() {
return true;
}

@Override
protected TestDataComparator getTestDataComparator() {
return new PostgresTestDataComparator();
}

@Override
protected boolean supportBasicDataTypeTest() {
return true;
}

@Override
protected boolean supportArrayDataTypeTest() {
return true;
}

@Override
protected boolean supportObjectDataTypeTest() {
return true;
}

@Override
protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv env, final String streamName, final String namespace)
throws Exception {
final String tableName = namingResolver.getIdentifier(streamName);
// Temporarily disabling the behavior of the ExtendedNameTransformer, see (issue #1785) so we don't
// use quoted names
// if (!tableName.startsWith("\"")) {
// // Currently, Normalization always quote tables identifiers
// //tableName = "\"" + tableName + "\"";
// }
return retrieveRecordsFromTable(tableName, namespace);
}

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
result.add(resolved.toUpperCase());
}
return result;
}

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException {
return Databases.createPostgresDatabase(db.getUsername(), db.getPassword(),
db.getJdbcUrl()).query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList()));
db.getJdbcUrl()).query(ctx -> {
ctx.execute("set time zone 'UTC';");
return ctx.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(this::getJsonFromRecord)
.collect(Collectors.toList());
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.postgres;

import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;

public class PostgresTestDataComparator extends AdvancedTestDataComparator {

private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();

private static final String POSTGRES_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";
private static final String POSTGRES_DATETIME_WITH_TZ_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
result.add(resolved.toUpperCase());
}
return result;
}

private LocalDate parseLocalDate(String dateTimeValue) {
if (dateTimeValue != null) {
var format = (dateTimeValue.matches(".+Z") ? POSTGRES_DATETIME_FORMAT : AIRBYTE_DATETIME_FORMAT);
return LocalDate.parse(dateTimeValue, DateTimeFormatter.ofPattern(format));
} else {
return null;
}
}

@Override
protected boolean compareDateTimeValues(String expectedValue, String actualValue) {
var destinationDate = parseLocalDate(actualValue);
var expectedDate = LocalDate.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_FORMAT));
return expectedDate.equals(destinationDate);
}

@Override
protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) {
return ZonedDateTime.of(LocalDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(POSTGRES_DATETIME_WITH_TZ_FORMAT)), ZoneOffset.UTC);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.ssh.SshBastionContainer;
import io.airbyte.integrations.base.ssh.SshTunnel;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import java.util.ArrayList;
import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
Expand All @@ -29,7 +28,7 @@
* Abstract class that allows us to avoid duplicating testing logic for testing SSH with a key file
* or with a password.
*/
public abstract class SshPostgresDestinationAcceptanceTest extends DestinationAcceptanceTest {
public abstract class SshPostgresDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest {

private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();
private static final String schemaName = RandomStringUtils.randomAlphabetic(8).toLowerCase();
Expand Down Expand Up @@ -63,7 +62,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace)
.stream()
.map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText()))
.map(r -> r.get(JavaBaseConstants.COLUMN_NAME_DATA))
.collect(Collectors.toList());
}

Expand All @@ -82,32 +81,33 @@ protected boolean implementsNamespaces() {
return true;
}

@Override
protected TestDataComparator getTestDataComparator() {
return new PostgresTestDataComparator();
}

@Override
protected boolean supportBasicDataTypeTest() {
return true;
}

@Override
protected boolean supportArrayDataTypeTest() {
return true;
}

@Override
protected boolean supportObjectDataTypeTest() {
return true;
}

@Override
protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv env, final String streamName, final String namespace)
throws Exception {
final String tableName = namingResolver.getIdentifier(streamName);
// Temporarily disabling the behavior of the ExtendedNameTransformer, see (issue #1785) so we don't
// use quoted names
// if (!tableName.startsWith("\"")) {
// // Currently, Normalization always quote tables identifiers
// //tableName = "\"" + tableName + "\"";
// }
return retrieveRecordsFromTable(tableName, namespace);
}

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
result.add(resolved.toUpperCase());
}
return result;
}

private static Database getDatabaseFromConfig(final JsonNode config) {
return Databases.createPostgresDatabase(
config.get("username").asText(),
Expand All @@ -123,13 +123,13 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
PostgresDestination.HOST_KEY,
PostgresDestination.PORT_KEY,
(CheckedFunction<JsonNode, List<JsonNode>, Exception>) mangledConfig -> getDatabaseFromConfig(mangledConfig)
.query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList())));
.query(ctx -> {
ctx.execute("set time zone 'UTC';");
return ctx.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(this::getJsonFromRecord)
.collect(Collectors.toList());
}));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,20 @@
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Integration test testing {@link RedshiftCopyS3Destination}. The default Redshift integration test
* credentials contain S3 credentials - this automatically causes COPY to be selected.
*/
public class RedshiftCopyDestinationAcceptanceTest extends DestinationAcceptanceTest {
public class RedshiftCopyDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest {

private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftCopyDestinationAcceptanceTest.class);

Expand Down Expand Up @@ -121,29 +119,6 @@ protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv test
return retrieveRecordsFromTable(tableName, namespace);
}

private JsonNode getJsonFromRecord(Record record) {
ObjectNode node = mapper.createObjectNode();

Arrays.stream(record.fields()).forEach(field -> {
var value = record.get(field);

switch (field.getDataType().getTypeName()) {
case "varchar", "other":
var stringValue = (value != null ? value.toString() : null);
if (stringValue != null && (stringValue.replaceAll("[^\\x00-\\x7F]", "").matches("^\\[.*\\]$")
|| stringValue.replaceAll("[^\\x00-\\x7F]", "").matches("^\\{.*\\}$"))) {
node.set(field.getName(), Jsons.deserialize(stringValue));
} else {
node.put(field.getName(), stringValue);
}
break;
default:
node.put(field.getName(), (value != null ? value.toString() : null));
}
});
return node;
}

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException {
return getDatabase().query(
ctx -> ctx
Expand Down

0 comments on commit 781be94

Please sign in to comment.