Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cdc): limit queue size to lower memory consumption #26473

Merged
merged 12 commits into from
May 25, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
Expand All @@ -41,15 +42,18 @@ public class AirbyteDebeziumHandler<T> {
private final CdcTargetPosition<T> targetPosition;
private final boolean trackSchemaHistory;
private final Duration firstRecordWaitTime;
private final OptionalInt queueSize;

public AirbyteDebeziumHandler(final JsonNode config,
final CdcTargetPosition<T> targetPosition,
final boolean trackSchemaHistory,
final Duration firstRecordWaitTime) {
final Duration firstRecordWaitTime,
final OptionalInt queueSize) {
this.config = config;
this.targetPosition = targetPosition;
this.trackSchemaHistory = trackSchemaHistory;
this.firstRecordWaitTime = firstRecordWaitTime;
this.queueSize = queueSize;
}

public AutoCloseableIterator<AirbyteMessage> getSnapshotIterators(
Expand All @@ -60,7 +64,7 @@ public AutoCloseableIterator<AirbyteMessage> getSnapshotIterators(
final Instant emittedAt) {

LOGGER.info("Running snapshot for " + catalogContainingStreamsToSnapshot.getStreams().size() + " new tables");
final LinkedBlockingQueue<ChangeEvent<String, String>> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
final LinkedBlockingQueue<ChangeEvent<String, String>> queue = new LinkedBlockingQueue<>(queueSize.orElse(QUEUE_CAPACITY));

final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeDummyStateForSnapshotPurpose();
final DebeziumRecordPublisher tableSnapshotPublisher = new DebeziumRecordPublisher(snapshotProperties,
Expand Down Expand Up @@ -93,7 +97,7 @@ public AutoCloseableIterator<AirbyteMessage> getIncrementalIterators(final Confi
final Instant emittedAt,
final boolean addDbNameToState) {
LOGGER.info("Using CDC: {}", true);
final LinkedBlockingQueue<ChangeEvent<String, String>> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
final LinkedBlockingQueue<ChangeEvent<String, String>> queue = new LinkedBlockingQueue<>(queueSize.orElse(QUEUE_CAPACITY));
final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState(cdcSavedInfoFetcher.getSavedOffset(),
addDbNameToState ? Optional.ofNullable(config.get(JdbcUtils.DATABASE_KEY).asText()) : Optional.empty());
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager = schemaHistoryManager(cdcSavedInfoFetcher);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
public class DebeziumPropertiesManager {

private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumPropertiesManager.class);
private static final String BYTE_VALUE_256_MB = Integer.toString(256 * 1024 * 1024);
private final JsonNode config;
private final AirbyteFileOffsetBackingStore offsetManager;
private final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager;
Expand All @@ -46,30 +47,33 @@ public Properties getDebeziumProperties() {
props.putAll(properties);

// debezium engine configuration
// https://debezium.io/documentation/reference/2.1/development/engine.html#engine-properties
// https://debezium.io/documentation/reference/2.2/development/engine.html#engine-properties
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", offsetManager.getOffsetFilePath().toString());
props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer
// default values from debezium CommonConnectorConfig
props.setProperty("max.batch.size", "2048");
props.setProperty("max.queue.size", "8192");

props.setProperty("errors.max.retries", "10");
// Disabling retries because debezium startup time might exceed our 60-second wait limit
// The maximum number of retries on connection errors before failing (-1 = no limit, 0 = disabled, >
// 0 = num of retries).
props.setProperty("errors.max.retries", "0");
// This property must be strictly less than errors.retry.delay.max.ms
// (https://github.com/debezium/debezium/blob/bcc7d49519a4f07d123c616cfa45cd6268def0b9/debezium-core/src/main/java/io/debezium/util/DelayStrategy.java#L135)
props.setProperty("errors.retry.delay.initial.ms", "299");
props.setProperty("errors.retry.delay.max.ms", "300");

if (schemaHistoryManager.isPresent()) {
// https://debezium.io/documentation/reference/2.1/operations/debezium-server.html#debezium-source-database-history-class
// https://debezium.io/documentation/reference/2.2/operations/debezium-server.html#debezium-source-database-history-class
// https://debezium.io/documentation/reference/development/engine.html#_in_the_code
// As mentioned in the documents above, debezium connector for MySQL needs to track the schema
// changes. If we don't do this, we can't fetch records for the table.
props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
props.setProperty("schema.history.internal.file.filename", schemaHistoryManager.get().getPath().toString());
}

// https://debezium.io/documentation/reference/2.1/configuration/avro.html
// https://debezium.io/documentation/reference/2.2/configuration/avro.html
props.setProperty("key.converter.schemas.enable", "false");
props.setProperty("value.converter.schemas.enable", "false");

Expand All @@ -89,10 +93,13 @@ public Properties getDebeziumProperties() {
// By default "decimal.handing.mode=precise" which's caused returning this value as a binary.
// The "double" type may cause a loss of precision, so set Debezium's config to store it as a String
// explicitly in its Kafka messages for more details see:
// https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-decimal-types
// https://debezium.io/documentation/reference/2.2/connectors/postgresql.html#postgresql-decimal-types
// https://debezium.io/documentation/faq/#how_to_retrieve_decimal_field_from_binary_representation
props.setProperty("decimal.handling.mode", "string");

// https://debezium.io/documentation/reference/2.2/connectors/postgresql.html#postgresql-property-max-queue-size-in-bytes
props.setProperty("max.queue.size.in.bytes", BYTE_VALUE_256_MB);

// WARNING : Never change the value of this otherwise all the connectors would start syncing from
// scratch
props.setProperty("topic.prefix", config.get(JdbcUtils.DATABASE_KEY).asText());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void start(final BlockingQueue<ChangeEvent<String, String>> queue) {
// debezium outputs a tombstone event that has a value of null. this is an artifact of how it
// interacts with kafka. we want to ignore it.
// more on the tombstone:
// https://debezium.io/documentation/reference/2.1/transformations/event-flattening.html
// https://debezium.io/documentation/reference/2.2/transformations/event-flattening.html
if (e.value() != null) {
try {
queue.put(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ private int getTimePrecision(final RelationalColumn field) {
}

// Ref :
// https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-temporal-types
// https://debezium.io/documentation/reference/2.2/connectors/postgresql.html#postgresql-temporal-types
private void registerDate(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
final var fieldType = field.typeName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ ENV APPLICATION source-mssql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.16
LABEL io.airbyte.version=1.0.17
LABEL io.airbyte.name=airbyte/source-mssql-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 1.0.16
dockerImageTag: 1.0.17
dockerRepository: airbyte/source-mssql-strict-encrypt
githubIssueLabel: source-mssql
icon: mssql.svg
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ ENV APPLICATION source-mssql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.16
LABEL io.airbyte.version=1.0.17
LABEL io.airbyte.name=airbyte/source-mssql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 1.0.16
dockerImageTag: 1.0.17
dockerRepository: airbyte/source-mssql
githubIssueLabel: source-mssql
icon: mssql.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public enum ReplicationMethod {
* The default "SNAPSHOT" mode can prevent other (non-Airbyte) transactions from updating table rows
* while we snapshot. References:
* https://docs.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql?view=sql-server-ver15
* https://debezium.io/documentation/reference/2.1/connectors/sqlserver.html#sqlserver-property-snapshot-isolation-mode
* https://debezium.io/documentation/reference/2.2/connectors/sqlserver.html#sqlserver-property-snapshot-isolation-mode
*/
public enum SnapshotIsolation {

Expand Down Expand Up @@ -69,7 +69,7 @@ public static SnapshotIsolation from(final String jsonValue) {

}

// https://debezium.io/documentation/reference/2.1/connectors/sqlserver.html#sqlserver-property-snapshot-mode
// https://debezium.io/documentation/reference/2.2/connectors/sqlserver.html#sqlserver-property-snapshot-mode
public enum DataToSync {

EXISTING_AND_NEW("Existing and New", "initial"),
Expand Down Expand Up @@ -146,9 +146,9 @@ static Properties getDebeziumProperties(final JdbcDatabase database, final Confi
final Properties props = new Properties();
props.setProperty("connector.class", "io.debezium.connector.sqlserver.SqlServerConnector");

// https://debezium.io/documentation/reference/2.1/connectors/sqlserver.html#sqlserver-property-include-schema-changes
// https://debezium.io/documentation/reference/2.2/connectors/sqlserver.html#sqlserver-property-include-schema-changes
props.setProperty("include.schema.changes", "false");
// https://debezium.io/documentation/reference/2.1/connectors/sqlserver.html#sqlserver-property-provide-transaction-metadata
// https://debezium.io/documentation/reference/2.2/connectors/sqlserver.html#sqlserver-property-provide-transaction-metadata
props.setProperty("provide.transaction.metadata", "false");

props.setProperty("converters", "mssql_converter");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.Supplier;
import org.slf4j.Logger;
Expand Down Expand Up @@ -449,7 +450,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
final Duration firstRecordWaitTime = FirstRecordWaitTimeUtil.getFirstRecordWaitTime(sourceConfig);
final AirbyteDebeziumHandler<Lsn> handler =
new AirbyteDebeziumHandler<>(sourceConfig,
MssqlCdcTargetPosition.getTargetPosition(database, sourceConfig.get(JdbcUtils.DATABASE_KEY).asText()), true, firstRecordWaitTime);
MssqlCdcTargetPosition.getTargetPosition(database, sourceConfig.get(JdbcUtils.DATABASE_KEY).asText()), true, firstRecordWaitTime, OptionalInt.empty());

final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorSupplier = () -> handler.getIncrementalIterators(catalog,
new MssqlCdcSavedInfoFetcher(stateManager.getCdcStateManager().getCdcState()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ ENV APPLICATION source-mysql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.23
LABEL io.airbyte.version=2.0.24

LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 2.0.23
dockerImageTag: 2.0.24
dockerRepository: airbyte/source-mysql-strict-encrypt
githubIssueLabel: source-mysql
icon: mysql.svg
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ ENV APPLICATION source-mysql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.23
LABEL io.airbyte.version=2.0.24

LABEL io.airbyte.name=airbyte/source-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 2.0.23
dockerImageTag: 2.0.24
dockerRepository: airbyte/source-mysql
githubIssueLabel: source-mysql
icon: mysql.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ static Properties getDebeziumProperties(final JdbcDatabase database) {
// initial snapshot
props.setProperty("snapshot.mode", sourceConfig.get("snapshot_mode").asText());
} else {
// https://debezium.io/documentation/reference/2.1/connectors/mysql.html#mysql-property-snapshot-mode
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-mode
props.setProperty("snapshot.mode", "when_needed");
}

Expand All @@ -52,8 +52,8 @@ private static Properties commonProperties(final JdbcDatabase database) {
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");

props.setProperty("database.server.id", String.valueOf(generateServerID()));
// https://debezium.io/documentation/reference/2.1/connectors/mysql.html#mysql-boolean-values
// https://debezium.io/documentation/reference/2.1/development/converters.html
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-boolean-values
// https://debezium.io/documentation/reference/2.2/development/converters.html
/**
* {@link io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter}
* {@link MySQLConverter}
Expand All @@ -74,7 +74,7 @@ private static Properties commonProperties(final JdbcDatabase database) {
}

// Check params for SSL connection in config and add properties for CDC SSL connection
// https://debezium.io/documentation/reference/2.1/connectors/mysql.html#mysql-property-database-ssl-mode
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-database-ssl-mode
if (!sourceConfig.has(JdbcUtils.SSL_KEY) || sourceConfig.get(JdbcUtils.SSL_KEY).asBoolean()) {
if (dbConfig.has(SSL_MODE) && !dbConfig.get(SSL_MODE).asText().isEmpty()) {
props.setProperty("database.ssl.mode", MySqlSource.toSslJdbcParamInternal(SslMode.valueOf(dbConfig.get(SSL_MODE).asText())));
Expand All @@ -100,15 +100,15 @@ private static Properties commonProperties(final JdbcDatabase database) {
}
}

// https://debezium.io/documentation/reference/2.1/connectors/mysql.html#mysql-property-snapshot-locking-mode
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-locking-mode
// This is to make sure other database clients are allowed to write to a table while Airbyte is
// taking a snapshot. There is a risk involved that
// if any database client makes a schema change then the sync might break
props.setProperty("snapshot.locking.mode", "none");
// https://debezium.io/documentation/reference/2.1/connectors/mysql.html#mysql-property-include-schema-changes
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-include-schema-changes
props.setProperty("include.schema.changes", "false");
// This to make sure that binary data represented as a base64-encoded String.
// https://debezium.io/documentation/reference/2.1/connectors/mysql.html#mysql-property-binary-handling-mode
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-binary-handling-mode
props.setProperty("binary.handling.mode", "base64");
props.setProperty("database.include.list", sourceConfig.get("database").asText());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -274,7 +275,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
final Duration firstRecordWaitTime = FirstRecordWaitTimeUtil.getFirstRecordWaitTime(sourceConfig);
LOGGER.info("First record waiting time: {} seconds", firstRecordWaitTime.getSeconds());
final AirbyteDebeziumHandler<MySqlCdcPosition> handler =
new AirbyteDebeziumHandler<>(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), true, firstRecordWaitTime);
new AirbyteDebeziumHandler<>(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), true, firstRecordWaitTime, OptionalInt.empty());

final MySqlCdcStateHandler mySqlCdcStateHandler = new MySqlCdcStateHandler(stateManager);
final MySqlCdcConnectorMetadataInjector mySqlCdcConnectorMetadataInjector = new MySqlCdcConnectorMetadataInjector();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.29
LABEL io.airbyte.version=2.0.30
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ data:
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
maxSecondsBetweenMessages: 7200
dockerImageTag: 2.0.29
dockerImageTag: 2.0.30
dockerRepository: airbyte/source-postgres-strict-encrypt
githubIssueLabel: source-postgres
icon: postgresql.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,15 @@
"min": 120,
"max": 1200
},
"queue_size": {
"type": "integer",
"title": "Size of the queue (Advanced)",
"description": "The size of the internal queue. This may interfere with memory consumption and efficiency of the connector, please be careful.",
"default": 10000,
"order": 6,
"min": 1000,
"max": 10000
},
"lsn_commit_behaviour": {
"type": "string",
"title": "LSN commit behaviour",
Expand All @@ -290,7 +299,7 @@
"After loading Data in the destination"
],
"default": "After loading Data in the destination",
"order": 6
"order": 7
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.29
LABEL io.airbyte.version=2.0.30
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 2.0.29
dockerImageTag: 2.0.30
maxSecondsBetweenMessages: 7200
dockerRepository: airbyte/source-postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private static Properties commonProperties(final JdbcDatabase database) {
}

// Check params for SSL connection in config and add properties for CDC SSL connection
// https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-property-database-sslmode
// https://debezium.io/documentation/reference/2.2/connectors/postgresql.html#postgresql-property-database-sslmode
if (!sourceConfig.has(JdbcUtils.SSL_KEY) || sourceConfig.get(JdbcUtils.SSL_KEY).asBoolean()) {
if (sourceConfig.has(JdbcUtils.SSL_MODE_KEY) && sourceConfig.get(JdbcUtils.SSL_MODE_KEY).has(JdbcUtils.MODE_KEY)) {

Expand Down
Loading