Skip to content

Commit

Permalink
Postgres Source: use native Postgres timeout if it's not set by the u…
Browse files Browse the repository at this point in the history
…ser (#19291)

* Postgres Source: use native Postgres timeout if it's not set by the user

* refactoring

* updated connection timeout logic and added tests for different datasources creation

* fixed pmd

* refactoring

* bump version

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
VitaliiMaltsev and octavia-squidington-iii authored Nov 16, 2022
1 parent 693f976 commit a1db24c
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1178,7 +1178,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.23
dockerImageTag: 1.0.24
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
40 changes: 8 additions & 32 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10873,7 +10873,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.23"
- dockerImage: "airbyte/source-postgres:1.0.24"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down Expand Up @@ -10963,59 +10963,47 @@
order: 7
oneOf:
- title: "disable"
additionalProperties: false
additionalProperties: true
description: "Disable SSL."
required:
- "mode"
properties:
mode:
type: "string"
const: "disable"
enum:
- "disable"
default: "disable"
order: 0
- title: "allow"
additionalProperties: false
additionalProperties: true
description: "Allow SSL mode."
required:
- "mode"
properties:
mode:
type: "string"
const: "allow"
enum:
- "allow"
default: "allow"
order: 0
- title: "prefer"
additionalProperties: false
additionalProperties: true
description: "Prefer SSL mode."
required:
- "mode"
properties:
mode:
type: "string"
const: "prefer"
enum:
- "prefer"
default: "prefer"
order: 0
- title: "require"
additionalProperties: false
additionalProperties: true
description: "Require SSL mode."
required:
- "mode"
properties:
mode:
type: "string"
const: "require"
enum:
- "require"
default: "require"
order: 0
- title: "verify-ca"
additionalProperties: false
additionalProperties: true
description: "Verify-ca SSL mode."
required:
- "mode"
Expand All @@ -11024,9 +11012,6 @@
mode:
type: "string"
const: "verify-ca"
enum:
- "verify-ca"
default: "verify-ca"
order: 0
ca_certificate:
type: "string"
Expand Down Expand Up @@ -11057,7 +11042,7 @@
airbyte_secret: true
order: 4
- title: "verify-full"
additionalProperties: false
additionalProperties: true
description: "Verify-full SSL mode."
required:
- "mode"
Expand All @@ -11066,9 +11051,6 @@
mode:
type: "string"
const: "verify-full"
enum:
- "verify-full"
default: "verify-full"
order: 0
ca_certificate:
type: "string"
Expand Down Expand Up @@ -11113,9 +11095,6 @@
method:
type: "string"
const: "Standard"
enum:
- "Standard"
default: "Standard"
order: 0
- title: "Logical Replication (CDC)"
description: "Logical replication uses the Postgres write-ahead log (WAL)\
Expand All @@ -11131,9 +11110,6 @@
method:
type: "string"
const: "CDC"
enum:
- "CDC"
default: "CDC"
order: 0
plugin:
type: "string"
Expand All @@ -11146,7 +11122,7 @@
enum:
- "pgoutput"
- "wal2json"
default: "pgoutput"
const: "pgoutput"
order: 1
replication_slot:
type: "string"
Expand Down
4 changes: 4 additions & 0 deletions airbyte-db/db-lib/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ dependencies {

// MongoDB
implementation 'org.mongodb:mongodb-driver-sync:4.3.0'

// MySQL
implementation 'mysql:mysql-connector-java:8.0.30'

}

task(newConfigsMigration, dependsOn: 'classes', type: JavaExec) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

package io.airbyte.db.factory;

import static org.postgresql.PGProperty.CONNECT_TIMEOUT;

import com.google.common.base.Preconditions;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.io.Closeable;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import javax.sql.DataSource;

/**
Expand Down Expand Up @@ -61,7 +64,7 @@ public static DataSource create(final String username,
.withJdbcUrl(jdbcConnectionString)
.withPassword(password)
.withUsername(username)
.withConnectionTimeoutMs(DataSourceBuilder.getConnectionTimeoutMs(connectionProperties))
.withConnectionTimeoutMs(DataSourceBuilder.getConnectionTimeoutMs(connectionProperties, driverClassName))
.build();
}

Expand Down Expand Up @@ -196,12 +199,23 @@ private DataSourceBuilder() {}
*
* @param connectionProperties custom jdbc_url_parameters containing information on connection
* properties
* @param driverClassName name of the JDBC driver
* @return DataSourceBuilder class used to create dynamic fields for DataSource
*/
private static long getConnectionTimeoutMs(final Map<String, String> connectionProperties) {
private static long getConnectionTimeoutMs(final Map<String, String> connectionProperties, String driverClassName) {
// TODO: the usage of CONNECT_TIMEOUT is Postgres specific, may need to extend for other databases
if (driverClassName.equals(DatabaseDriver.POSTGRESQL.getDriverClassName())) {
final String pgPropertyConnectTimeout = CONNECT_TIMEOUT.getName();
// If the PGProperty.CONNECT_TIMEOUT was set by the user, then take its value, if not take the
// default
if (connectionProperties.containsKey(pgPropertyConnectTimeout)
&& (Long.parseLong(connectionProperties.get(pgPropertyConnectTimeout)) >= 0)) {
return Duration.ofSeconds(Long.parseLong(connectionProperties.get(pgPropertyConnectTimeout))).toMillis();
} else {
return Duration.ofSeconds(Long.parseLong(Objects.requireNonNull(CONNECT_TIMEOUT.getDefaultValue()))).toMillis();
}
}
final Duration connectionTimeout;
// TODO: the usage of CONNECT_TIMEOUT_KEY is Postgres specific, may need to extend for other
// databases
connectionTimeout =
connectionProperties.containsKey(CONNECT_TIMEOUT_KEY) ? Duration.ofSeconds(Long.parseLong(connectionProperties.get(CONNECT_TIMEOUT_KEY)))
: CONNECT_TIMEOUT_DEFAULT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;

/**
* Test suite for the {@link DataSourceFactory} class.
*/
class DataSourceFactoryTest extends CommonFactoryTest {

private static final String CONNECT_TIMEOUT = "connectTimeout";

static String database;
static String driverClassName;
static String host;
Expand All @@ -45,7 +48,7 @@ static void setup() {
@Test
void testCreatingDataSourceWithConnectionTimeoutSetAboveDefault() {
final Map<String, String> connectionProperties = Map.of(
"connectTimeout", "61");
CONNECT_TIMEOUT, "61");
final DataSource dataSource = DataSourceFactory.create(
username,
password,
Expand All @@ -58,9 +61,9 @@ void testCreatingDataSourceWithConnectionTimeoutSetAboveDefault() {
}

@Test
void testCreatingDataSourceWithConnectionTimeoutSetBelowDefault() {
void testCreatingPostgresDataSourceWithConnectionTimeoutSetBelowDefault() {
final Map<String, String> connectionProperties = Map.of(
"connectTimeout", "30");
CONNECT_TIMEOUT, "30");
final DataSource dataSource = DataSourceFactory.create(
username,
password,
Expand All @@ -69,13 +72,31 @@ void testCreatingDataSourceWithConnectionTimeoutSetBelowDefault() {
connectionProperties);
assertNotNull(dataSource);
assertEquals(HikariDataSource.class, dataSource.getClass());
assertEquals(60000, ((HikariDataSource) dataSource).getHikariConfigMXBean().getConnectionTimeout());
assertEquals(30000, ((HikariDataSource) dataSource).getHikariConfigMXBean().getConnectionTimeout());
}

@Test
void testCreatingMySQLDataSourceWithConnectionTimeoutSetBelowDefault() {
try (MySQLContainer<?> mySQLContainer = new MySQLContainer<>("mysql:8.0")) {
mySQLContainer.start();
final Map<String, String> connectionProperties = Map.of(
CONNECT_TIMEOUT, "30");
final DataSource dataSource = DataSourceFactory.create(
mySQLContainer.getUsername(),
mySQLContainer.getPassword(),
mySQLContainer.getDriverClassName(),
mySQLContainer.getJdbcUrl(),
connectionProperties);
assertNotNull(dataSource);
assertEquals(HikariDataSource.class, dataSource.getClass());
assertEquals(60000, ((HikariDataSource) dataSource).getHikariConfigMXBean().getConnectionTimeout());
}
}

@Test
void testCreatingDataSourceWithConnectionTimeoutSetWithZero() {
final Map<String, String> connectionProperties = Map.of(
"connectTimeout", "0");
CONNECT_TIMEOUT, "0");
final DataSource dataSource = DataSourceFactory.create(
username,
password,
Expand All @@ -88,7 +109,7 @@ void testCreatingDataSourceWithConnectionTimeoutSetWithZero() {
}

@Test
void testCreatingDataSourceWithConnectionTimeoutNotSet() {
void testCreatingPostgresDataSourceWithConnectionTimeoutNotSet() {
final Map<String, String> connectionProperties = Map.of();
final DataSource dataSource = DataSourceFactory.create(
username,
Expand All @@ -98,7 +119,25 @@ void testCreatingDataSourceWithConnectionTimeoutNotSet() {
connectionProperties);
assertNotNull(dataSource);
assertEquals(HikariDataSource.class, dataSource.getClass());
assertEquals(60000, ((HikariDataSource) dataSource).getHikariConfigMXBean().getConnectionTimeout());
assertEquals(10000, ((HikariDataSource) dataSource).getHikariConfigMXBean().getConnectionTimeout());
}

@Test
void testCreatingMySQLDataSourceWithConnectionTimeoutNotSet() {
try (MySQLContainer<?> mySQLContainer = new MySQLContainer<>("mysql:8.0")) {
mySQLContainer.start();
final Map<String, String> connectionProperties = Map.of();
final DataSource dataSource = DataSourceFactory.create(
mySQLContainer.getUsername(),
mySQLContainer.getPassword(),
mySQLContainer.getDriverClassName(),
mySQLContainer.getJdbcUrl(),
connectionProperties);
assertNotNull(dataSource);
assertEquals(HikariDataSource.class, dataSource.getClass());
assertEquals(60000, ((HikariDataSource) dataSource).getHikariConfigMXBean().getConnectionTimeout());
}

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.23
LABEL io.airbyte.version=1.0.24
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.23
LABEL io.airbyte.version=1.0.24
LABEL io.airbyte.name=airbyte/source-postgres
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ The root causes is that the WALs needed for the incremental sync has been remove

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-------------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.0.24 | 2022-11-07 | [19291](https://github.com/airbytehq/airbyte/pull/19291) | Default timeout is reduced from 1 min to 10sec |
| 1.0.23 | 2022-11-07 | [19025](https://github.com/airbytehq/airbyte/pull/19025) | Stop enforce SSL if ssl mode is disabled |
| 1.0.22 | 2022-10-31 | [18538](https://github.com/airbytehq/airbyte/pull/18538) | Encode database name |
| 1.0.21 | 2022-10-25 | [18256](https://github.com/airbytehq/airbyte/pull/18256) | Disable allow and prefer ssl modes in CDC mode |
Expand Down

0 comments on commit a1db24c

Please sign in to comment.