diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java index 917e03da3b98e..432539edd3737 100644 --- a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java @@ -18,6 +18,7 @@ import java.time.Duration; import java.time.Instant; import java.util.Optional; +import java.util.OptionalInt; import java.util.Properties; import java.util.concurrent.LinkedBlockingQueue; import org.slf4j.Logger; @@ -41,15 +42,18 @@ public class AirbyteDebeziumHandler { private final CdcTargetPosition targetPosition; private final boolean trackSchemaHistory; private final Duration firstRecordWaitTime; + private final OptionalInt queueSize; public AirbyteDebeziumHandler(final JsonNode config, final CdcTargetPosition targetPosition, final boolean trackSchemaHistory, - final Duration firstRecordWaitTime) { + final Duration firstRecordWaitTime, + final OptionalInt queueSize) { this.config = config; this.targetPosition = targetPosition; this.trackSchemaHistory = trackSchemaHistory; this.firstRecordWaitTime = firstRecordWaitTime; + this.queueSize = queueSize; } public AutoCloseableIterator getSnapshotIterators( @@ -60,7 +64,7 @@ public AutoCloseableIterator getSnapshotIterators( final Instant emittedAt) { LOGGER.info("Running snapshot for " + catalogContainingStreamsToSnapshot.getStreams().size() + " new tables"); - final LinkedBlockingQueue> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); + final LinkedBlockingQueue> queue = new LinkedBlockingQueue<>(queueSize.orElse(QUEUE_CAPACITY)); final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeDummyStateForSnapshotPurpose(); final DebeziumRecordPublisher tableSnapshotPublisher = new DebeziumRecordPublisher(snapshotProperties, @@ -93,7 +97,7 @@ public AutoCloseableIterator getIncrementalIterators(final Confi final Instant emittedAt, final boolean addDbNameToState) { LOGGER.info("Using CDC: {}", true); - final LinkedBlockingQueue> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); + final LinkedBlockingQueue> queue = new LinkedBlockingQueue<>(queueSize.orElse(QUEUE_CAPACITY)); final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState(cdcSavedInfoFetcher.getSavedOffset(), addDbNameToState ? Optional.ofNullable(config.get(JdbcUtils.DATABASE_KEY).asText()) : Optional.empty()); final Optional schemaHistoryManager = schemaHistoryManager(cdcSavedInfoFetcher); diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumPropertiesManager.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumPropertiesManager.java index da87c360c8f73..7018cbed78f41 100644 --- a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumPropertiesManager.java +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumPropertiesManager.java @@ -22,6 +22,7 @@ public class DebeziumPropertiesManager { private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumPropertiesManager.class); + private static final String BYTE_VALUE_256_MB = Integer.toString(256 * 1024 * 1024); private final JsonNode config; private final AirbyteFileOffsetBackingStore offsetManager; private final Optional schemaHistoryManager; @@ -46,7 +47,7 @@ public Properties getDebeziumProperties() { props.putAll(properties); // debezium engine configuration - // https://debezium.io/documentation/reference/2.1/development/engine.html#engine-properties + // https://debezium.io/documentation/reference/2.2/development/engine.html#engine-properties props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore"); props.setProperty("offset.storage.file.filename", offsetManager.getOffsetFilePath().toString()); props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer @@ -54,14 +55,17 @@ public Properties getDebeziumProperties() { props.setProperty("max.batch.size", "2048"); props.setProperty("max.queue.size", "8192"); - props.setProperty("errors.max.retries", "10"); + // Disabling retries because debezium startup time might exceed our 60-second wait limit + // The maximum number of retries on connection errors before failing (-1 = no limit, 0 = disabled, > + // 0 = num of retries). + props.setProperty("errors.max.retries", "0"); // This property must be strictly less than errors.retry.delay.max.ms // (https://github.com/debezium/debezium/blob/bcc7d49519a4f07d123c616cfa45cd6268def0b9/debezium-core/src/main/java/io/debezium/util/DelayStrategy.java#L135) props.setProperty("errors.retry.delay.initial.ms", "299"); props.setProperty("errors.retry.delay.max.ms", "300"); if (schemaHistoryManager.isPresent()) { - // https://debezium.io/documentation/reference/2.1/operations/debezium-server.html#debezium-source-database-history-class + // https://debezium.io/documentation/reference/2.2/operations/debezium-server.html#debezium-source-database-history-class // https://debezium.io/documentation/reference/development/engine.html#_in_the_code // As mentioned in the documents above, debezium connector for MySQL needs to track the schema // changes. If we don't do this, we can't fetch records for the table. @@ -69,7 +73,7 @@ public Properties getDebeziumProperties() { props.setProperty("schema.history.internal.file.filename", schemaHistoryManager.get().getPath().toString()); } - // https://debezium.io/documentation/reference/2.1/configuration/avro.html + // https://debezium.io/documentation/reference/2.2/configuration/avro.html props.setProperty("key.converter.schemas.enable", "false"); props.setProperty("value.converter.schemas.enable", "false"); @@ -89,10 +93,13 @@ public Properties getDebeziumProperties() { // By default "decimal.handing.mode=precise" which's caused returning this value as a binary. // The "double" type may cause a loss of precision, so set Debezium's config to store it as a String // explicitly in its Kafka messages for more details see: - // https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-decimal-types + // https://debezium.io/documentation/reference/2.2/connectors/postgresql.html#postgresql-decimal-types // https://debezium.io/documentation/faq/#how_to_retrieve_decimal_field_from_binary_representation props.setProperty("decimal.handling.mode", "string"); + // https://debezium.io/documentation/reference/2.2/connectors/postgresql.html#postgresql-property-max-queue-size-in-bytes + props.setProperty("max.queue.size.in.bytes", BYTE_VALUE_256_MB); + // WARNING : Never change the value of this otherwise all the connectors would start syncing from // scratch props.setProperty("topic.prefix", config.get(JdbcUtils.DATABASE_KEY).asText()); diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java index 3bf84b5ba9276..2b1167db9b037 100644 --- a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java @@ -59,7 +59,7 @@ public void start(final BlockingQueue> queue) { // debezium outputs a tombstone event that has a value of null. this is an artifact of how it // interacts with kafka. we want to ignore it. // more on the tombstone: - // https://debezium.io/documentation/reference/2.1/transformations/event-flattening.html + // https://debezium.io/documentation/reference/2.2/transformations/event-flattening.html if (e.value() != null) { try { queue.put(e); diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/postgres/PostgresConverter.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/postgres/PostgresConverter.java index 7fb41758e7b03..a260f93761ef5 100644 --- a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/postgres/PostgresConverter.java +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/postgres/PostgresConverter.java @@ -240,7 +240,7 @@ private int getTimePrecision(final RelationalColumn field) { } // Ref : - // https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-temporal-types + // https://debezium.io/documentation/reference/2.2/connectors/postgresql.html#postgresql-temporal-types private void registerDate(final RelationalColumn field, final ConverterRegistration registration) { final var fieldType = field.typeName(); diff --git a/airbyte-integrations/connectors/source-mssql-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-mssql-strict-encrypt/Dockerfile index 36c4325f02eb8..0b00a76e28326 100644 --- a/airbyte-integrations/connectors/source-mssql-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-mssql-strict-encrypt/Dockerfile @@ -24,5 +24,5 @@ ENV APPLICATION source-mssql-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.16 +LABEL io.airbyte.version=1.0.17 LABEL io.airbyte.name=airbyte/source-mssql-strict-encrypt diff --git a/airbyte-integrations/connectors/source-mssql-strict-encrypt/metadata.yaml b/airbyte-integrations/connectors/source-mssql-strict-encrypt/metadata.yaml index 7b3ba757c5987..6024988b6faf0 100644 --- a/airbyte-integrations/connectors/source-mssql-strict-encrypt/metadata.yaml +++ b/airbyte-integrations/connectors/source-mssql-strict-encrypt/metadata.yaml @@ -11,7 +11,7 @@ data: connectorSubtype: database connectorType: source definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1 - dockerImageTag: 1.0.16 + dockerImageTag: 1.0.17 dockerRepository: airbyte/source-mssql-strict-encrypt githubIssueLabel: source-mssql icon: mssql.svg diff --git a/airbyte-integrations/connectors/source-mssql/Dockerfile b/airbyte-integrations/connectors/source-mssql/Dockerfile index 6f9053decfd3a..27a10869e785d 100644 --- a/airbyte-integrations/connectors/source-mssql/Dockerfile +++ b/airbyte-integrations/connectors/source-mssql/Dockerfile @@ -24,5 +24,5 @@ ENV APPLICATION source-mssql COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.16 +LABEL io.airbyte.version=1.0.17 LABEL io.airbyte.name=airbyte/source-mssql diff --git a/airbyte-integrations/connectors/source-mssql/metadata.yaml b/airbyte-integrations/connectors/source-mssql/metadata.yaml index 09668c2a38e89..8d11811131fe9 100644 --- a/airbyte-integrations/connectors/source-mssql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mssql/metadata.yaml @@ -6,7 +6,7 @@ data: connectorSubtype: database connectorType: source definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1 - dockerImageTag: 1.0.16 + dockerImageTag: 1.0.17 dockerRepository: airbyte/source-mssql githubIssueLabel: source-mssql icon: mssql.svg diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcHelper.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcHelper.java index ba517ba46e34e..4d3a50fefaa8b 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcHelper.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcHelper.java @@ -39,7 +39,7 @@ public enum ReplicationMethod { * The default "SNAPSHOT" mode can prevent other (non-Airbyte) transactions from updating table rows * while we snapshot. References: * https://docs.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql?view=sql-server-ver15 - * https://debezium.io/documentation/reference/2.1/connectors/sqlserver.html#sqlserver-property-snapshot-isolation-mode + * https://debezium.io/documentation/reference/2.2/connectors/sqlserver.html#sqlserver-property-snapshot-isolation-mode */ public enum SnapshotIsolation { @@ -69,7 +69,7 @@ public static SnapshotIsolation from(final String jsonValue) { } - // https://debezium.io/documentation/reference/2.1/connectors/sqlserver.html#sqlserver-property-snapshot-mode + // https://debezium.io/documentation/reference/2.2/connectors/sqlserver.html#sqlserver-property-snapshot-mode public enum DataToSync { EXISTING_AND_NEW("Existing and New", "initial"), @@ -146,9 +146,9 @@ static Properties getDebeziumProperties(final JdbcDatabase database, final Confi final Properties props = new Properties(); props.setProperty("connector.class", "io.debezium.connector.sqlserver.SqlServerConnector"); - // https://debezium.io/documentation/reference/2.1/connectors/sqlserver.html#sqlserver-property-include-schema-changes + // https://debezium.io/documentation/reference/2.2/connectors/sqlserver.html#sqlserver-property-include-schema-changes props.setProperty("include.schema.changes", "false"); - // https://debezium.io/documentation/reference/2.1/connectors/sqlserver.html#sqlserver-property-provide-transaction-metadata + // https://debezium.io/documentation/reference/2.2/connectors/sqlserver.html#sqlserver-property-provide-transaction-metadata props.setProperty("provide.transaction.metadata", "false"); props.setProperty("converters", "mssql_converter"); diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java index 5e28ead79e822..0ae6c467c3c67 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java @@ -59,6 +59,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; import java.util.Set; import java.util.function.Supplier; import org.slf4j.Logger; @@ -449,7 +450,7 @@ public List> getIncrementalIterators( final Duration firstRecordWaitTime = FirstRecordWaitTimeUtil.getFirstRecordWaitTime(sourceConfig); final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler<>(sourceConfig, - MssqlCdcTargetPosition.getTargetPosition(database, sourceConfig.get(JdbcUtils.DATABASE_KEY).asText()), true, firstRecordWaitTime); + MssqlCdcTargetPosition.getTargetPosition(database, sourceConfig.get(JdbcUtils.DATABASE_KEY).asText()), true, firstRecordWaitTime, OptionalInt.empty()); final Supplier> incrementalIteratorSupplier = () -> handler.getIncrementalIterators(catalog, new MssqlCdcSavedInfoFetcher(stateManager.getCdcStateManager().getCdcState()), diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile index 89a6c82184d7a..fe26c070409f5 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile @@ -24,6 +24,6 @@ ENV APPLICATION source-mysql-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=2.0.23 +LABEL io.airbyte.version=2.0.24 LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml b/airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml index d0968dde098e8..a9130805c00f9 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml @@ -11,7 +11,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 2.0.23 + dockerImageTag: 2.0.24 dockerRepository: airbyte/source-mysql-strict-encrypt githubIssueLabel: source-mysql icon: mysql.svg diff --git a/airbyte-integrations/connectors/source-mysql/Dockerfile b/airbyte-integrations/connectors/source-mysql/Dockerfile index f436e7c92329e..21f94bd5485cd 100644 --- a/airbyte-integrations/connectors/source-mysql/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql/Dockerfile @@ -24,6 +24,6 @@ ENV APPLICATION source-mysql COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=2.0.23 +LABEL io.airbyte.version=2.0.24 LABEL io.airbyte.name=airbyte/source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index afe93a1604083..a099870b6c49c 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -6,7 +6,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 2.0.23 + dockerImageTag: 2.0.24 dockerRepository: airbyte/source-mysql githubIssueLabel: source-mysql icon: mysql.svg diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java index 0893c75011502..3385812126c93 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java @@ -37,7 +37,7 @@ static Properties getDebeziumProperties(final JdbcDatabase database) { // initial snapshot props.setProperty("snapshot.mode", sourceConfig.get("snapshot_mode").asText()); } else { - // https://debezium.io/documentation/reference/2.1/connectors/mysql.html#mysql-property-snapshot-mode + // https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-mode props.setProperty("snapshot.mode", "when_needed"); } @@ -52,8 +52,8 @@ private static Properties commonProperties(final JdbcDatabase database) { props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector"); props.setProperty("database.server.id", String.valueOf(generateServerID())); - // https://debezium.io/documentation/reference/2.1/connectors/mysql.html#mysql-boolean-values - // https://debezium.io/documentation/reference/2.1/development/converters.html + // https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-boolean-values + // https://debezium.io/documentation/reference/2.2/development/converters.html /** * {@link io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter} * {@link MySQLConverter} @@ -74,7 +74,7 @@ private static Properties commonProperties(final JdbcDatabase database) { } // Check params for SSL connection in config and add properties for CDC SSL connection - // https://debezium.io/documentation/reference/2.1/connectors/mysql.html#mysql-property-database-ssl-mode + // https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-database-ssl-mode if (!sourceConfig.has(JdbcUtils.SSL_KEY) || sourceConfig.get(JdbcUtils.SSL_KEY).asBoolean()) { if (dbConfig.has(SSL_MODE) && !dbConfig.get(SSL_MODE).asText().isEmpty()) { props.setProperty("database.ssl.mode", MySqlSource.toSslJdbcParamInternal(SslMode.valueOf(dbConfig.get(SSL_MODE).asText()))); @@ -100,15 +100,15 @@ private static Properties commonProperties(final JdbcDatabase database) { } } - // https://debezium.io/documentation/reference/2.1/connectors/mysql.html#mysql-property-snapshot-locking-mode + // https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-locking-mode // This is to make sure other database clients are allowed to write to a table while Airbyte is // taking a snapshot. There is a risk involved that // if any database client makes a schema change then the sync might break props.setProperty("snapshot.locking.mode", "none"); - // https://debezium.io/documentation/reference/2.1/connectors/mysql.html#mysql-property-include-schema-changes + // https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-include-schema-changes props.setProperty("include.schema.changes", "false"); // This to make sure that binary data represented as a base64-encoded String. - // https://debezium.io/documentation/reference/2.1/connectors/mysql.html#mysql-property-binary-handling-mode + // https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-binary-handling-mode props.setProperty("binary.handling.mode", "base64"); props.setProperty("database.include.list", sourceConfig.get("database").asText()); diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index 7a27805d11385..b8c8799a09ac9 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -67,6 +67,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.OptionalInt; import java.util.Set; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -274,7 +275,7 @@ public List> getIncrementalIterators(final final Duration firstRecordWaitTime = FirstRecordWaitTimeUtil.getFirstRecordWaitTime(sourceConfig); LOGGER.info("First record waiting time: {} seconds", firstRecordWaitTime.getSeconds()); final AirbyteDebeziumHandler handler = - new AirbyteDebeziumHandler<>(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), true, firstRecordWaitTime); + new AirbyteDebeziumHandler<>(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), true, firstRecordWaitTime, OptionalInt.empty()); final MySqlCdcStateHandler mySqlCdcStateHandler = new MySqlCdcStateHandler(stateManager); final MySqlCdcConnectorMetadataInjector mySqlCdcConnectorMetadataInjector = new MySqlCdcConnectorMetadataInjector(); diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile index 7f5f531158b72..f26f8e6e5aec2 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile @@ -24,5 +24,5 @@ ENV APPLICATION source-postgres-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=2.0.29 +LABEL io.airbyte.version=2.0.30 LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/metadata.yaml b/airbyte-integrations/connectors/source-postgres-strict-encrypt/metadata.yaml index 511e9e35bb740..7c049940adfa7 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/metadata.yaml +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/metadata.yaml @@ -12,7 +12,7 @@ data: connectorType: source definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 maxSecondsBetweenMessages: 7200 - dockerImageTag: 2.0.29 + dockerImageTag: 2.0.30 dockerRepository: airbyte/source-postgres-strict-encrypt githubIssueLabel: source-postgres icon: postgresql.svg diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test-integration/resources/expected_strict_encrypt_spec.json b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test-integration/resources/expected_strict_encrypt_spec.json index 6f951ca4e5159..add556c58b41c 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test-integration/resources/expected_strict_encrypt_spec.json +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test-integration/resources/expected_strict_encrypt_spec.json @@ -281,6 +281,15 @@ "min": 120, "max": 1200 }, + "queue_size": { + "type": "integer", + "title": "Size of the queue (Advanced)", + "description": "The size of the internal queue. This may interfere with memory consumption and efficiency of the connector, please be careful.", + "default": 10000, + "order": 6, + "min": 1000, + "max": 10000 + }, "lsn_commit_behaviour": { "type": "string", "title": "LSN commit behaviour", @@ -290,7 +299,7 @@ "After loading Data in the destination" ], "default": "After loading Data in the destination", - "order": 6 + "order": 7 } } } diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index e5398da984f6c..3dbb3a533260e 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -24,5 +24,5 @@ ENV APPLICATION source-postgres COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=2.0.29 +LABEL io.airbyte.version=2.0.30 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/metadata.yaml b/airbyte-integrations/connectors/source-postgres/metadata.yaml index ac25e2a6d6386..704b876f73636 100644 --- a/airbyte-integrations/connectors/source-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/source-postgres/metadata.yaml @@ -6,7 +6,7 @@ data: connectorSubtype: database connectorType: source definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 - dockerImageTag: 2.0.29 + dockerImageTag: 2.0.30 maxSecondsBetweenMessages: 7200 dockerRepository: airbyte/source-postgres githubIssueLabel: source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java index b17c80ee74f88..a83a260e5719d 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java @@ -62,7 +62,7 @@ private static Properties commonProperties(final JdbcDatabase database) { } // Check params for SSL connection in config and add properties for CDC SSL connection - // https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-property-database-sslmode + // https://debezium.io/documentation/reference/2.2/connectors/postgresql.html#postgresql-property-database-sslmode if (!sourceConfig.has(JdbcUtils.SSL_KEY) || sourceConfig.get(JdbcUtils.SSL_KEY).asBoolean()) { if (sourceConfig.has(JdbcUtils.SSL_MODE_KEY) && sourceConfig.get(JdbcUtils.SSL_MODE_KEY).has(JdbcUtils.MODE_KEY)) { diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 2d6e138ebeb6f..19de784b9c93e 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -85,6 +85,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.OptionalInt; import java.util.OptionalLong; import java.util.Set; import java.util.function.Supplier; @@ -341,6 +342,10 @@ public List> getCheckOperations(final J PostgresUtils.checkFirstRecordWaitTime(config); }); + checkOperations.add(database -> { + PostgresUtils.checkQueueSize(config); + }); + // Verify that a CDC connection can be created checkOperations.add(database -> { /** @@ -366,7 +371,9 @@ public List> getIncrementalIterators(final final JsonNode sourceConfig = database.getSourceConfig(); if (PostgresUtils.isCdc(sourceConfig) && shouldUseCDC(catalog)) { final Duration firstRecordWaitTime = PostgresUtils.getFirstRecordWaitTime(sourceConfig); + final OptionalInt queueSize = OptionalInt.of(PostgresUtils.getQueueSize(sourceConfig)); LOGGER.info("First record waiting time: {} seconds", firstRecordWaitTime.getSeconds()); + LOGGER.info("Queue size: {}", queueSize.getAsInt()); final PostgresDebeziumStateUtil postgresDebeziumStateUtil = new PostgresDebeziumStateUtil(); final JsonNode state = @@ -404,7 +411,7 @@ public List> getIncrementalIterators(final } final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler<>(sourceConfig, - PostgresCdcTargetPosition.targetPosition(database), false, firstRecordWaitTime); + PostgresCdcTargetPosition.targetPosition(database), false, firstRecordWaitTime, queueSize); final PostgresCdcStateHandler postgresCdcStateHandler = new PostgresCdcStateHandler(stateManager); final List streamsToSnapshot = identifyStreamsToSnapshot(catalog, stateManager); final Supplier> incrementalIteratorSupplier = () -> handler.getIncrementalIterators(catalog, diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java index c82845f080527..10ce90d4550e4 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.JsonNode; import java.time.Duration; import java.util.Optional; +import java.util.OptionalInt; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +42,9 @@ public class PostgresUtils { public static final Duration MIN_FIRST_RECORD_WAIT_TIME = Duration.ofMinutes(2); public static final Duration MAX_FIRST_RECORD_WAIT_TIME = Duration.ofMinutes(20); public static final Duration DEFAULT_FIRST_RECORD_WAIT_TIME = Duration.ofMinutes(5); + private static final int MIN_QUEUE_SIZE = 1000; + private static final int MAX_QUEUE_SIZE = 10000; + public static String getPluginValue(final JsonNode field) { return field.has("plugin") ? field.get("plugin").asText() : PGOUTPUT_PLUGIN; @@ -71,6 +75,45 @@ public static Optional getFirstRecordWaitSeconds(final JsonNode config) return Optional.empty(); } + private static OptionalInt extractQueueSizeFromConfig(final JsonNode config) { + final JsonNode replicationMethod = config.get("replication_method"); + if (replicationMethod != null && replicationMethod.has("queue_size")) { + final int queueSize = config.get("replication_method").get("queue_size").asInt(); + return OptionalInt.of(queueSize); + } + return OptionalInt.empty(); + } + + public static int getQueueSize(final JsonNode config) { + final OptionalInt sizeFromConfig = extractQueueSizeFromConfig(config); + if (sizeFromConfig.isPresent()) { + int size = sizeFromConfig.getAsInt(); + if (size < MIN_QUEUE_SIZE) { + LOGGER.warn("Queue size is overridden to {} , which is the min allowed for safety.", + MIN_QUEUE_SIZE); + return MIN_QUEUE_SIZE; + } else if (size > MAX_QUEUE_SIZE) { + LOGGER.warn("Queue size is overridden to {} , which is the max allowed for safety.", + MAX_QUEUE_SIZE); + return MAX_QUEUE_SIZE; + } + return size; + } + return MAX_QUEUE_SIZE; + } + + public static void checkQueueSize(final JsonNode config) { + final OptionalInt queueSize = extractQueueSizeFromConfig(config); + if (queueSize.isPresent()) { + final int size = queueSize.getAsInt(); + if (size < MIN_QUEUE_SIZE || size > MAX_QUEUE_SIZE) { + throw new IllegalArgumentException( + String.format("queue_size must be between %d and %d ", + MIN_QUEUE_SIZE, MAX_QUEUE_SIZE)); + } + } + } + public static void checkFirstRecordWaitTime(final JsonNode config) { // we need to skip the check because in tests, we set initial_waiting_seconds // to 5 seconds for performance reasons, which is shorter than the minimum diff --git a/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json b/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json index 4bc0c37260a1a..944c03a47a730 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json @@ -280,6 +280,15 @@ "min": 120, "max": 1200 }, + "queue_size": { + "type": "integer", + "title": "Size of the queue (Advanced)", + "description": "The size of the internal queue. This may interfere with memory consumption and efficiency of the connector, please be careful.", + "default": 10000, + "order": 6, + "min": 1000, + "max": 10000 + }, "lsn_commit_behaviour": { "type": "string", "title": "LSN commit behaviour", @@ -289,7 +298,7 @@ "After loading Data in the destination" ], "default": "After loading Data in the destination", - "order": 6 + "order": 7 } } } diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_spec.json b/airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_spec.json index 2b89eccbc1f85..d03e067bb5501 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_spec.json +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_spec.json @@ -280,6 +280,15 @@ "min": 120, "max": 1200 }, + "queue_size": { + "type": "integer", + "title": "Size of the queue (Advanced)", + "description": "The size of the internal queue. This may interfere with memory consumption and efficiency of the connector, please be careful.", + "default": 10000, + "order": 6, + "min": 1000, + "max": 10000 + }, "lsn_commit_behaviour": { "type": "string", "title": "LSN commit behaviour", @@ -289,7 +298,7 @@ "After loading Data in the destination" ], "default": "After loading Data in the destination", - "order": 6 + "order": 7 } } } diff --git a/docs/integrations/sources/mssql.md b/docs/integrations/sources/mssql.md index 60c65193f621b..a092b87ca3b1c 100644 --- a/docs/integrations/sources/mssql.md +++ b/docs/integrations/sources/mssql.md @@ -341,6 +341,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura | Version | Date | Pull Request | Subject | |:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.0.17 | 2023-05-25 | [26473](https://github.com/airbytehq/airbyte/pull/26473) | CDC : Limit queue size | | 1.0.16 | 2023-05-01 | [25740](https://github.com/airbytehq/airbyte/pull/25740) | Disable index logging | | 1.0.15 | 2023-04-26 | [25401](https://github.com/airbytehq/airbyte/pull/25401) | CDC : Upgrade Debezium to version 2.2.0 | | 1.0.14 | 2023-04-19 | [25345](https://github.com/airbytehq/airbyte/pull/25345) | Logging : Log database indexes per stream | diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index cdfcb21f4811e..eef4a1c996380 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -262,6 +262,7 @@ WHERE actor_definition_id ='435bb9a5-7887-4809-aa58-28c27df0d7ad' AND (configura | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| +| 2.0.24 | 2023-05-25 | [26473](https://github.com/airbytehq/airbyte/pull/26473) | CDC : Limit queue size | | 2.0.23 | 2023-05-24 | [25586](https://github.com/airbytehq/airbyte/pull/25586) | No need to base64 encode strings on databases sorted with binary collation | | 2.0.22 | 2023-05-22 | [25859](https://github.com/airbytehq/airbyte/pull/25859) | Allow adding sessionVariables JDBC parameters | | 2.0.21 | 2023-05-10 | [25460](https://github.com/airbytehq/airbyte/pull/25460) | Handle a decimal number with 0 decimal points as an integer | diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index e5d6470c00bae..664580ad54973 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -399,6 +399,7 @@ Some larger tables may encounter an error related to the temporary file size lim | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 2.0.30 | 2023-05-25 | [26473](https://github.com/airbytehq/airbyte/pull/26473) | CDC : Limit queue size | | 2.0.29 | 2023-05-18 | [25898](https://github.com/airbytehq/airbyte/pull/25898) | Translate Numeric values without decimal, e.g: NUMERIC(38,0), as BigInt instead of Double | | 2.0.28 | 2023-04-27 | [25401](https://github.com/airbytehq/airbyte/pull/25401) | CDC : Upgrade Debezium to version 2.2.0 | | 2.0.27 | 2023-04-26 | [24971](https://github.com/airbytehq/airbyte/pull/24971) | Emit stream status updates |