From 41c11f6d09c9a3bebf9ed12964d8f02f7d453463 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Fri, 22 Jul 2022 15:43:35 -0700 Subject: [PATCH 1/4] update mysqlsource for latest constructor --- .../java/io/airbyte/integrations/source/mysql/MySqlSource.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index 4e7fa29875dc5..1b2f4dfc911a4 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -35,6 +35,7 @@ import io.airbyte.protocol.models.CommonField; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.SyncMode; +import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; @@ -178,7 +179,7 @@ public List> getIncrementalIterators(final final JsonNode sourceConfig = database.getSourceConfig(); if (isCdc(sourceConfig) && shouldUseCDC(catalog)) { final AirbyteDebeziumHandler handler = - new AirbyteDebeziumHandler(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), true); + new AirbyteDebeziumHandler(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), true, Duration.ofMinutes); final Optional cdcState = Optional.ofNullable(stateManager.getCdcStateManager().getCdcState()); final MySqlCdcSavedInfoFetcher fetcher = new MySqlCdcSavedInfoFetcher(cdcState.orElse(null)); From 43cac453e19e6af0c0dbb12c606ae1d1d083e78c Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Fri, 22 Jul 2022 16:07:39 -0700 Subject: [PATCH 2/4] turns out editing code in the github ui isn't a great idea --- .../java/io/airbyte/integrations/source/mysql/MySqlSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index 1b2f4dfc911a4..10ced47226ca4 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -179,7 +179,7 @@ public List> getIncrementalIterators(final final JsonNode sourceConfig = database.getSourceConfig(); if (isCdc(sourceConfig) && shouldUseCDC(catalog)) { final AirbyteDebeziumHandler handler = - new AirbyteDebeziumHandler(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), true, Duration.ofMinutes); + new AirbyteDebeziumHandler(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), true, Duration.ofMinutes(5)); final Optional cdcState = Optional.ofNullable(stateManager.getCdcStateManager().getCdcState()); final MySqlCdcSavedInfoFetcher fetcher = new MySqlCdcSavedInfoFetcher(cdcState.orElse(null)); From a152734fb7802c50991455d9c2246e17e941e654 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Fri, 22 Jul 2022 16:08:27 -0700 Subject: [PATCH 3/4] 1 minute --- .../java/io/airbyte/integrations/source/mysql/MySqlSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index 10ced47226ca4..b088e365dc032 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -179,7 +179,7 @@ public List> getIncrementalIterators(final final JsonNode sourceConfig = database.getSourceConfig(); if (isCdc(sourceConfig) && shouldUseCDC(catalog)) { final AirbyteDebeziumHandler handler = - new AirbyteDebeziumHandler(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), true, Duration.ofMinutes(5)); + new AirbyteDebeziumHandler(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), true, Duration.ofMinutes(1)); final Optional cdcState = Optional.ofNullable(stateManager.getCdcStateManager().getCdcState()); final MySqlCdcSavedInfoFetcher fetcher = new MySqlCdcSavedInfoFetcher(cdcState.orElse(null)); From beabf81bec4f91afeb11b47319748c93c1c0b4e2 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Fri, 22 Jul 2022 16:11:53 -0700 Subject: [PATCH 4/4] 5 minutes because mysql is slower than postgres --- .../java/io/airbyte/integrations/source/mysql/MySqlSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index b088e365dc032..10ced47226ca4 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -179,7 +179,7 @@ public List> getIncrementalIterators(final final JsonNode sourceConfig = database.getSourceConfig(); if (isCdc(sourceConfig) && shouldUseCDC(catalog)) { final AirbyteDebeziumHandler handler = - new AirbyteDebeziumHandler(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), true, Duration.ofMinutes(1)); + new AirbyteDebeziumHandler(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), true, Duration.ofMinutes(5)); final Optional cdcState = Optional.ofNullable(stateManager.getCdcStateManager().getCdcState()); final MySqlCdcSavedInfoFetcher fetcher = new MySqlCdcSavedInfoFetcher(cdcState.orElse(null));