diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshBastionContainer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshBastionContainer.java index 7b6032061ec77..f7acac0f0f5f2 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshBastionContainer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshBastionContainer.java @@ -40,13 +40,9 @@ public JsonNode getTunnelConfig(final SshTunnel.TunnelMethod tunnelMethod, final return Jsons.jsonNode(builderWithSchema .put("tunnel_method", Jsons.jsonNode(ImmutableMap.builder() - .put("tunnel_host", - Objects.requireNonNull(bastion.getContainerInfo().getNetworkSettings() - .getNetworks() - .get(((Network.NetworkImpl) network).getName()) - .getIpAddress())) + .put("tunnel_host", bastion.getHost()) .put("tunnel_method", tunnelMethod) - .put("tunnel_port", bastion.getExposedPorts().get(0)) + .put("tunnel_port", bastion.getFirstMappedPort()) .put("tunnel_user", SSH_USER) .put("tunnel_user_password", tunnelMethod.equals(SSH_PASSWORD_AUTH) ? SSH_PASSWORD : "") .put("ssh_key", tunnelMethod.equals(SSH_KEY_AUTH) ? bastion.execInContainer("cat", "var/bastion/id_rsa").getStdout() : "") diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshPostgresSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshPostgresSourceAcceptanceTest.java index 9f26a25320395..633e9715f59cc 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshPostgresSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshPostgresSourceAcceptanceTest.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Lists; +import io.airbyte.commons.functional.CheckedFunction; import io.airbyte.commons.json.Jsons; import io.airbyte.db.Database; import io.airbyte.db.factory.DSLContextFactory; @@ -25,7 +26,6 @@ import io.airbyte.protocol.models.SyncMode; import java.util.HashMap; import java.util.List; -import org.jooq.DSLContext; import org.jooq.SQLDialect; import org.testcontainers.containers.PostgreSQLContainer; @@ -33,9 +33,37 @@ public abstract class AbstractSshPostgresSourceAcceptanceTest extends SourceAcce private static final String STREAM_NAME = "public.id_and_name"; private static final String STREAM_NAME2 = "public.starships"; - private PostgreSQLContainer db; - private final SshBastionContainer bastion = new SshBastionContainer(); private static JsonNode config; + private final SshBastionContainer bastion = new SshBastionContainer(); + private PostgreSQLContainer db; + + private static void populateDatabaseTestData() throws Exception { + SshTunnel.sshWrap( + config, + List.of("host"), + List.of("port"), + (CheckedFunction, Exception>) mangledConfig -> getDatabaseFromConfig(mangledConfig) + .query(ctx -> { + ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));"); + ctx.fetch("INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');"); + ctx.fetch("CREATE TABLE starships(id INTEGER, name VARCHAR(200));"); + ctx.fetch("INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');"); + return null; + })); + } + + private static Database getDatabaseFromConfig(final JsonNode config) { + return new Database( + DSLContextFactory.create( + config.get("username").asText(), + config.get("password").asText(), + DatabaseDriver.POSTGRESQL.getDriverClassName(), + String.format(DatabaseDriver.POSTGRESQL.getUrlFormatString(), + config.get("host").asText(), + config.get("port").asInt(), + config.get("database").asText()), + SQLDialect.POSTGRES)); + } public abstract SshTunnel.TunnelMethod getTunnelMethod(); @@ -59,28 +87,6 @@ private void initAndStartJdbcContainer() { db.start(); } - private static void populateDatabaseTestData() throws Exception { - try (final DSLContext dslContext = DSLContextFactory.create( - config.get("username").asText(), - config.get("password").asText(), - DatabaseDriver.POSTGRESQL.getDriverClassName(), - String.format(DatabaseDriver.POSTGRESQL.getUrlFormatString(), - config.get("host").asText(), - config.get("port").asInt(), - config.get("database").asText()), - SQLDialect.POSTGRES)) { - final Database database = new Database(dslContext); - - database.query(ctx -> { - ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));"); - ctx.fetch("INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');"); - ctx.fetch("CREATE TABLE starships(id INTEGER, name VARCHAR(200));"); - ctx.fetch("INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');"); - return null; - }); - } - } - @Override protected void tearDown(final TestDestinationEnv testEnv) { bastion.stopAndCloseContainers(db); diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java index 2a5b46975c8bf..6d2caa0674204 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java @@ -317,4 +317,5 @@ public void testRecordsProducedDuringAndAfterSync() throws Exception { recordsFromFirstBatchWithoutDuplicates.size() + recordsFromSecondBatchWithoutDuplicates .size()); } + }