From 7afc28ac5666d26f60f5c82ce81fe2f486cea01d Mon Sep 17 00:00:00 2001 From: Yevhen Sukhomud Date: Fri, 17 Jun 2022 16:07:32 +0700 Subject: [PATCH 1/2] 13546 Fix integration tests source-postgres Mac OS --- .../base/ssh/SshBastionContainer.java | 8 +-- ...stractSshPostgresSourceAcceptanceTest.java | 68 ++++++++++--------- 2 files changed, 39 insertions(+), 37 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshBastionContainer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshBastionContainer.java index 7b6032061ec77..f7acac0f0f5f2 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshBastionContainer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshBastionContainer.java @@ -40,13 +40,9 @@ public JsonNode getTunnelConfig(final SshTunnel.TunnelMethod tunnelMethod, final return Jsons.jsonNode(builderWithSchema .put("tunnel_method", Jsons.jsonNode(ImmutableMap.builder() - .put("tunnel_host", - Objects.requireNonNull(bastion.getContainerInfo().getNetworkSettings() - .getNetworks() - .get(((Network.NetworkImpl) network).getName()) - .getIpAddress())) + .put("tunnel_host", bastion.getHost()) .put("tunnel_method", tunnelMethod) - .put("tunnel_port", bastion.getExposedPorts().get(0)) + .put("tunnel_port", bastion.getFirstMappedPort()) .put("tunnel_user", SSH_USER) .put("tunnel_user_password", tunnelMethod.equals(SSH_PASSWORD_AUTH) ? SSH_PASSWORD : "") .put("ssh_key", tunnelMethod.equals(SSH_KEY_AUTH) ? bastion.execInContainer("cat", "var/bastion/id_rsa").getStdout() : "") diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshPostgresSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshPostgresSourceAcceptanceTest.java index 9f26a25320395..9fd348e58c047 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshPostgresSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshPostgresSourceAcceptanceTest.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Lists; +import io.airbyte.commons.functional.CheckedFunction; import io.airbyte.commons.json.Jsons; import io.airbyte.db.Database; import io.airbyte.db.factory.DSLContextFactory; @@ -25,7 +26,6 @@ import io.airbyte.protocol.models.SyncMode; import java.util.HashMap; import java.util.List; -import org.jooq.DSLContext; import org.jooq.SQLDialect; import org.testcontainers.containers.PostgreSQLContainer; @@ -33,9 +33,37 @@ public abstract class AbstractSshPostgresSourceAcceptanceTest extends SourceAcce private static final String STREAM_NAME = "public.id_and_name"; private static final String STREAM_NAME2 = "public.starships"; - private PostgreSQLContainer db; - private final SshBastionContainer bastion = new SshBastionContainer(); private static JsonNode config; + private final SshBastionContainer bastion = new SshBastionContainer(); + private PostgreSQLContainer db; + + private static void populateDatabaseTestData() throws Exception { + SshTunnel.sshWrap( + config, + List.of("host"), + List.of("port"), + (CheckedFunction, Exception>) mangledConfig -> getDatabaseFromConfig(mangledConfig) + .query(ctx -> { + ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));"); + ctx.fetch("INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');"); + ctx.fetch("CREATE TABLE starships(id INTEGER, name VARCHAR(200));"); + ctx.fetch("INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');"); + return null; + })); + } + + private static Database getDatabaseFromConfig(final JsonNode config) { + return new Database( + DSLContextFactory.create( + config.get("username").asText(), + config.get("password").asText(), + DatabaseDriver.POSTGRESQL.getDriverClassName(), + String.format(DatabaseDriver.POSTGRESQL.getUrlFormatString(), + config.get("host").asText(), + config.get("port").asInt(), + config.get("database").asText()), + SQLDialect.POSTGRES)); + } public abstract SshTunnel.TunnelMethod getTunnelMethod(); @@ -59,28 +87,6 @@ private void initAndStartJdbcContainer() { db.start(); } - private static void populateDatabaseTestData() throws Exception { - try (final DSLContext dslContext = DSLContextFactory.create( - config.get("username").asText(), - config.get("password").asText(), - DatabaseDriver.POSTGRESQL.getDriverClassName(), - String.format(DatabaseDriver.POSTGRESQL.getUrlFormatString(), - config.get("host").asText(), - config.get("port").asInt(), - config.get("database").asText()), - SQLDialect.POSTGRES)) { - final Database database = new Database(dslContext); - - database.query(ctx -> { - ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));"); - ctx.fetch("INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');"); - ctx.fetch("CREATE TABLE starships(id INTEGER, name VARCHAR(200));"); - ctx.fetch("INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');"); - return null; - }); - } - } - @Override protected void tearDown(final TestDestinationEnv testEnv) { bastion.stopAndCloseContainers(db); @@ -109,18 +115,18 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() { .withCursorField(Lists.newArrayList("id")) .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(CatalogHelpers.createAirbyteStream( - STREAM_NAME, - Field.of("id", JsonSchemaType.NUMBER), - Field.of("name", JsonSchemaType.STRING)) + STREAM_NAME, + Field.of("id", JsonSchemaType.NUMBER), + Field.of("name", JsonSchemaType.STRING)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))), new ConfiguredAirbyteStream() .withSyncMode(SyncMode.INCREMENTAL) .withCursorField(Lists.newArrayList("id")) .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(CatalogHelpers.createAirbyteStream( - STREAM_NAME2, - Field.of("id", JsonSchemaType.NUMBER), - Field.of("name", JsonSchemaType.STRING)) + STREAM_NAME2, + Field.of("id", JsonSchemaType.NUMBER), + Field.of("name", JsonSchemaType.STRING)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))))); } From bb187bd6358fa3f4b679d14f864c3d5dd4c73aac Mon Sep 17 00:00:00 2001 From: Yevhen Sukhomud Date: Fri, 17 Jun 2022 21:10:40 +0700 Subject: [PATCH 2/2] 13546 Fixed formatting --- .../AbstractSshPostgresSourceAcceptanceTest.java | 12 ++++++------ .../source/postgres/CdcPostgresSourceTest.java | 1 + 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshPostgresSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshPostgresSourceAcceptanceTest.java index 9fd348e58c047..633e9715f59cc 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshPostgresSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshPostgresSourceAcceptanceTest.java @@ -115,18 +115,18 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() { .withCursorField(Lists.newArrayList("id")) .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(CatalogHelpers.createAirbyteStream( - STREAM_NAME, - Field.of("id", JsonSchemaType.NUMBER), - Field.of("name", JsonSchemaType.STRING)) + STREAM_NAME, + Field.of("id", JsonSchemaType.NUMBER), + Field.of("name", JsonSchemaType.STRING)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))), new ConfiguredAirbyteStream() .withSyncMode(SyncMode.INCREMENTAL) .withCursorField(Lists.newArrayList("id")) .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(CatalogHelpers.createAirbyteStream( - STREAM_NAME2, - Field.of("id", JsonSchemaType.NUMBER), - Field.of("name", JsonSchemaType.STRING)) + STREAM_NAME2, + Field.of("id", JsonSchemaType.NUMBER), + Field.of("name", JsonSchemaType.STRING)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))))); } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java index 2a5b46975c8bf..6d2caa0674204 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java @@ -317,4 +317,5 @@ public void testRecordsProducedDuringAndAfterSync() throws Exception { recordsFromFirstBatchWithoutDuplicates.size() + recordsFromSecondBatchWithoutDuplicates .size()); } + }