From 900563ea4b1cb114a32bf2e00a20e584e8eb8c07 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Thu, 8 Feb 2024 10:09:18 -0800 Subject: [PATCH 1/9] Add airbyte trace utility to emit analytics messages --- .../base/AirbyteTraceMessageUtility.java | 10 +++++++++- .../base/AirbyteTraceMessageUtilityTest.java | 12 ++++++------ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.java index 943f2cf8ba757..bb4784cb75817 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.java @@ -5,6 +5,7 @@ package io.airbyte.cdk.integrations.base; import io.airbyte.commons.stream.AirbyteStreamStatusHolder; +import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage; import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage; import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage.FailureType; import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage; @@ -50,6 +51,10 @@ public static void emitEstimateTrace(final long byteEstimate, .withNamespace(streamNamespace)))); } + public static void emitAnalyticsTrace(final AirbyteAnalyticsTraceMessage airbyteAnalyticsTraceMessage) { + emitMessage(makeAnalyticsTraceAirbyteMessage(airbyteAnalyticsTraceMessage)); + } + public static void emitErrorTrace(final Throwable e, final String displayMessage, final FailureType failureType) { emitMessage(makeErrorTraceAirbyteMessage(e, displayMessage, failureType)); } @@ -86,6 +91,10 @@ private static AirbyteMessage makeErrorTraceAirbyteMessage( .withStackTrace(ExceptionUtils.getStackTrace(e)))); } + private static AirbyteMessage makeAnalyticsTraceAirbyteMessage(final AirbyteAnalyticsTraceMessage airbyteAnalyticsTraceMessage) { + return new AirbyteMessage().withType(Type.TRACE).withTrace(new AirbyteTraceMessage().withAnalytics(airbyteAnalyticsTraceMessage)); + } + private static AirbyteMessage makeStreamStatusTraceAirbyteMessage(final AirbyteStreamStatusHolder airbyteStreamStatusHolder) { return makeAirbyteMessageFromTraceMessage(airbyteStreamStatusHolder.toTraceMessage()); } @@ -97,5 +106,4 @@ private static AirbyteMessage makeAirbyteMessageFromTraceMessage(final AirbyteTr private static AirbyteTraceMessage makeAirbyteTraceMessage(final AirbyteTraceMessage.Type traceMessageType) { return new AirbyteTraceMessage().withType(traceMessageType).withEmittedAt((double) System.currentTimeMillis()); } - } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtilityTest.java b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtilityTest.java index f75f7a01ac99a..6ea6492d29603 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtilityTest.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtilityTest.java @@ -26,7 +26,7 @@ public void setUpOut() { System.setOut(new PrintStream(outContent, true, StandardCharsets.UTF_8)); } - private void assertJsonNodeIsTraceMessage(JsonNode jsonNode) { + private void assertJsonNodeIsTraceMessage(final JsonNode jsonNode) { // todo: this check could be better by actually trying to convert the JsonNode to an // AirbyteTraceMessage instance Assertions.assertEquals("TRACE", jsonNode.get("type").asText()); @@ -36,7 +36,7 @@ private void assertJsonNodeIsTraceMessage(JsonNode jsonNode) { @Test void testEmitSystemErrorTrace() { AirbyteTraceMessageUtility.emitSystemErrorTrace(Mockito.mock(RuntimeException.class), "this is a system error"); - JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8)); + final JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8)); assertJsonNodeIsTraceMessage(outJson); Assertions.assertEquals("system_error", outJson.get("trace").get("error").get("failure_type").asText()); } @@ -44,7 +44,7 @@ void testEmitSystemErrorTrace() { @Test void testEmitConfigErrorTrace() { AirbyteTraceMessageUtility.emitConfigErrorTrace(Mockito.mock(RuntimeException.class), "this is a config error"); - JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8)); + final JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8)); assertJsonNodeIsTraceMessage(outJson); Assertions.assertEquals("config_error", outJson.get("trace").get("error").get("failure_type").asText()); } @@ -58,11 +58,11 @@ void testEmitErrorTrace() { @Test void testCorrectStacktraceFormat() { try { - int x = 1 / 0; - } catch (Exception e) { + final int x = 1 / 0; + } catch (final Exception e) { AirbyteTraceMessageUtility.emitSystemErrorTrace(e, "you exploded the universe"); } - JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8)); + final JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8)); Assertions.assertTrue(outJson.get("trace").get("error").get("stack_trace").asText().contains("\n\tat")); } From ad3093e5dbd40298686e74d0bac42c7e041904f4 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Thu, 8 Feb 2024 14:12:19 -0800 Subject: [PATCH 2/9] Emit analytics traces --- airbyte-cdk/java/airbyte-cdk/README.md | 5 +++-- .../java/io/airbyte/cdk/db/DbAnalyticsUtils.java | 15 +++++++++++++++ .../core/src/main/resources/version.properties | 2 +- .../connectors/source-mongodb-v2/build.gradle | 2 +- .../source/mongodb/cdc/MongoDbCdcInitializer.java | 4 ++++ .../connectors/source-postgres/build.gradle | 2 +- .../postgres/cdc/PostgresCdcCtidInitializer.java | 2 ++ 7 files changed, 27 insertions(+), 5 deletions(-) create mode 100644 airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/db/DbAnalyticsUtils.java diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 1d505d8b172bc..0b8a14a0c56ec 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -166,8 +166,9 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 0.17.0 | 2024-02-08 | [\#34502](https://github.com/airbytehq/airbyte/pull/34502) | Enable configuring async destination batch size. | -| 0.16.6 | 2024-02-07 | [\#34892](https://github.com/airbytehq/airbyte/pull/34892) | Improved testcontainers logging and support for unshared containers. | +| 0.17.1 | 2024-02-07 | [\#35036](https://github.com/airbytehq/airbyte/pull/35036) | Add trace utility to emit analytics messages. | +| 0.17.0 | 2024-02-08 | [\#34502](https://github.com/airbytehq/airbyte/pull/34502) | Enable configuring async destination batch size. | +| 0.16.6 | 2024-02-07 | [\#34892](https://github.com/airbytehq/airbyte/pull/34892) | Improved testcontainers logging and support for unshared containers. | | 0.16.5 | 2024-02-07 | [\#34948](https://github.com/airbytehq/airbyte/pull/34948) | Fix source state stats counting logic | | 0.16.4 | 2024-02-01 | [\#34727](https://github.com/airbytehq/airbyte/pull/34727) | Add future based stdout consumer in BaseTypingDedupingTest | | 0.16.3 | 2024-01-30 | [\#34669](https://github.com/airbytehq/airbyte/pull/34669) | Fix org.apache.logging.log4j:log4j-slf4j-impl version conflicts. | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/db/DbAnalyticsUtils.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/db/DbAnalyticsUtils.java new file mode 100644 index 0000000000000..d1cc385b131aa --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/db/DbAnalyticsUtils.java @@ -0,0 +1,15 @@ +package io.airbyte.cdk.db; + +import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage; + +/** + * Utility class to define constants associated with database source connector analytics events. Make sure to add the analytics event to + * https://www.notion.so/Connector-Analytics-Events-892a79a49852465f8d59a18bd84c36de + */ +public class DbAnalyticsUtils { + public static final String CDC_CURSOR_INVALID_KEY = "db-sources-cdc-cursor-invalid"; + + public static AirbyteAnalyticsTraceMessage cdcCursorInvalidMessage() { + return new AirbyteAnalyticsTraceMessage().withType(CDC_CURSOR_INVALID_KEY).withValue("1"); + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 9f28a3ffc22b0..beefc2ce98a7e 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.17.0 +version=0.17.1 diff --git a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle index 43fb1b80b40dc..fd5974073b578 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle +++ b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle @@ -7,7 +7,7 @@ plugins { airbyteJavaConnector { cdkVersionRequired = '0.16.3' features = ['db-sources'] - useLocalCdk = false + useLocalCdk = true } airbyteJavaConnector.addCdkDependencies() diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java index 4e6078a6b2f3a..77db930a699e0 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java @@ -4,10 +4,13 @@ package io.airbyte.integrations.source.mongodb.cdc; +import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcCursorInvalidMessage; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoDatabase; +import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.cdk.integrations.debezium.AirbyteDebeziumHandler; import io.airbyte.cdk.integrations.debezium.internals.RecordWaitTimeUtil; import io.airbyte.commons.json.Jsons; @@ -110,6 +113,7 @@ public List> createCdcIterators( optSavedOffset.filter(savedOffset -> mongoDbDebeziumStateUtil.isValidResumeToken(savedOffset, mongoClient)).isPresent(); if (!savedOffsetIsValid) { + AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage()); LOGGER.info("Saved offset is not valid. Airbyte will trigger a full refresh."); // If the offset in the state is invalid, reset the state to the initial STATE stateManager.resetState(new MongoDbCdcState(initialDebeziumState, config.getEnforceSchema())); diff --git a/airbyte-integrations/connectors/source-postgres/build.gradle b/airbyte-integrations/connectors/source-postgres/build.gradle index 3279caaab514d..3a75fc17ae821 100644 --- a/airbyte-integrations/connectors/source-postgres/build.gradle +++ b/airbyte-integrations/connectors/source-postgres/build.gradle @@ -15,7 +15,7 @@ java { airbyteJavaConnector { cdkVersionRequired = '0.16.6' features = ['db-sources'] - useLocalCdk = false + useLocalCdk = true } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java index a579000bd4c5e..73df20d9cc219 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java @@ -11,6 +11,7 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.db.jdbc.JdbcUtils; +import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.cdk.integrations.debezium.AirbyteDebeziumHandler; import io.airbyte.cdk.integrations.debezium.internals.RelationalDbDebeziumEventConverter; import io.airbyte.cdk.integrations.debezium.internals.RelationalDbDebeziumPropertiesManager; @@ -109,6 +110,7 @@ public static List> cdcCtidIteratorsCombin savedOffset); if (!savedOffsetAfterReplicationSlotLSN) { + AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage()); LOGGER.warn("Saved offset is before Replication slot's confirmed_flush_lsn, Airbyte will trigger sync from scratch"); } else if (!isDebugMode(sourceConfig) && PostgresUtils.shouldFlushAfterSync(sourceConfig)) { // We do not want to acknowledge the WAL logs in debug mode. From 67921f314b4bea3ae253b3e06547781ef958773d Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Thu, 8 Feb 2024 14:26:12 -0800 Subject: [PATCH 3/9] fix --- .../java/airbyte-cdk/core/src/main/resources/version.properties | 2 +- .../source/postgres/cdc/PostgresCdcCtidInitializer.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index beefc2ce98a7e..b7d2c2b03aa27 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.17.1 +version=0.17.2 diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java index 73df20d9cc219..74c1c28f6b7cb 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java @@ -4,6 +4,7 @@ package io.airbyte.integrations.source.postgres.cdc; +import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcCursorInvalidMessage; import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.streamsUnderVacuum; import static io.airbyte.integrations.source.postgres.PostgresUtils.isDebugMode; import static io.airbyte.integrations.source.postgres.PostgresUtils.prettyPrintConfiguredAirbyteStreamList; From 49f695f372f55f4f9a6b75c34a780ac975fca13a Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Mon, 12 Feb 2024 10:52:03 -0800 Subject: [PATCH 4/9] Fix trace message --- .../cdk/integrations/base/AirbyteTraceMessageUtility.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.java index bb4784cb75817..3dd402d068d0c 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.java @@ -12,6 +12,7 @@ import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteMessage.Type; import io.airbyte.protocol.models.v0.AirbyteTraceMessage; +import java.time.Instant; import java.util.function.Consumer; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -92,7 +93,11 @@ private static AirbyteMessage makeErrorTraceAirbyteMessage( } private static AirbyteMessage makeAnalyticsTraceAirbyteMessage(final AirbyteAnalyticsTraceMessage airbyteAnalyticsTraceMessage) { - return new AirbyteMessage().withType(Type.TRACE).withTrace(new AirbyteTraceMessage().withAnalytics(airbyteAnalyticsTraceMessage)); + return new AirbyteMessage().withType(Type.TRACE) + .withTrace(new AirbyteTraceMessage() + .withAnalytics(airbyteAnalyticsTraceMessage) + .withType(AirbyteTraceMessage.Type.ANALYTICS) + .withEmittedAt((double) Instant.now().toEpochMilli())); } private static AirbyteMessage makeStreamStatusTraceAirbyteMessage(final AirbyteStreamStatusHolder airbyteStreamStatusHolder) { From 62b9a670cfc63e28656935543a3bcf1487fe3c1a Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Tue, 13 Feb 2024 10:06:31 -0800 Subject: [PATCH 5/9] Update docs --- .../connectors/source-mongodb-v2/metadata.yaml | 2 +- airbyte-integrations/connectors/source-postgres/metadata.yaml | 2 +- docs/integrations/sources/mongodb-v2.md | 3 ++- docs/integrations/sources/postgres.md | 3 ++- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml index 7151f5e74f337..f9729994caf91 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml +++ b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: source definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e - dockerImageTag: 1.2.8 + dockerImageTag: 1.2.9 dockerRepository: airbyte/source-mongodb-v2 documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2 githubIssueLabel: source-mongodb-v2 diff --git a/airbyte-integrations/connectors/source-postgres/metadata.yaml b/airbyte-integrations/connectors/source-postgres/metadata.yaml index ac29d8cdf36e5..51c9ab87321b7 100644 --- a/airbyte-integrations/connectors/source-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/source-postgres/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 - dockerImageTag: 3.3.8 + dockerImageTag: 3.3.9 dockerRepository: airbyte/source-postgres documentationUrl: https://docs.airbyte.com/integrations/sources/postgres githubIssueLabel: source-postgres diff --git a/docs/integrations/sources/mongodb-v2.md b/docs/integrations/sources/mongodb-v2.md index 530a1544c1a37..4c3a524228043 100644 --- a/docs/integrations/sources/mongodb-v2.md +++ b/docs/integrations/sources/mongodb-v2.md @@ -214,7 +214,8 @@ For more information regarding configuration parameters, please see [MongoDb Doc | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------| -| 1.2.8 | 2024-02-08 | [34748](https://github.com/airbytehq/airbyte/pull/34748) | Adopt CDK 0.19.0 | +| 1.2.9 | 2024-02-13 | [35036](https://github.com/airbytehq/airbyte/pull/34751) | Emit analytics message for invalid CDC cursor. | +| 1.2.8 | 2024-02-08 | [34748](https://github.com/airbytehq/airbyte/pull/34748) | Adopt CDK 0.19.0 | | 1.2.7 | 2024-02-01 | [34759](https://github.com/airbytehq/airbyte/pull/34759) | Fail sync if initial snapshot for any stream fails. | | 1.2.6 | 2024-01-31 | [34594](https://github.com/airbytehq/airbyte/pull/34594) | Scope initial resume token to streams of interest. | | 1.2.5 | 2024-01-29 | [34641](https://github.com/airbytehq/airbyte/pull/34641) | Allow resuming an initial snapshot when Id type is not of default ObjectId . | diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index ef795a6397292..401c382f6c486 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -292,7 +292,8 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | Version | Date | Pull Request | Subject | |---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 3.3.8 | 2024-02-08 | [34751](https://github.com/airbytehq/airbyte/pull/34751) | Adopt CDK 0.19.0 | +| 3.3.9 | 2024-02-13 | [35036](https://github.com/airbytehq/airbyte/pull/34751) | Emit analytics message for invalid CDC cursor. | +| 3.3.8 | 2024-02-08 | [34751](https://github.com/airbytehq/airbyte/pull/34751) | Adopt CDK 0.19.0 | | 3.3.7 | 2024-02-08 | [34781](https://github.com/airbytehq/airbyte/pull/34781) | Add a setting in the setup page to advance the LSN. | | 3.3.6 | 2024-02-07 | [34892](https://github.com/airbytehq/airbyte/pull/34892) | Adopt CDK v0.16.6 | | 3.3.5 | 2024-02-07 | [34948](https://github.com/airbytehq/airbyte/pull/34948) | Adopt CDK v0.16.5 | From 3ab6ca58777d034e1d9efa98937feb948b9eed55 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Tue, 13 Feb 2024 11:39:55 -0800 Subject: [PATCH 6/9] FIx formatting --- .../main/java/io/airbyte/cdk/db/DbAnalyticsUtils.java | 9 ++++++++- .../integrations/base/AirbyteTraceMessageUtility.java | 1 + 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/db/DbAnalyticsUtils.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/db/DbAnalyticsUtils.java index d1cc385b131aa..c10cffa61fe2f 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/db/DbAnalyticsUtils.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/db/DbAnalyticsUtils.java @@ -1,15 +1,22 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.cdk.db; import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage; /** - * Utility class to define constants associated with database source connector analytics events. Make sure to add the analytics event to + * Utility class to define constants associated with database source connector analytics events. + * Make sure to add the analytics event to * https://www.notion.so/Connector-Analytics-Events-892a79a49852465f8d59a18bd84c36de */ public class DbAnalyticsUtils { + public static final String CDC_CURSOR_INVALID_KEY = "db-sources-cdc-cursor-invalid"; public static AirbyteAnalyticsTraceMessage cdcCursorInvalidMessage() { return new AirbyteAnalyticsTraceMessage().withType(CDC_CURSOR_INVALID_KEY).withValue("1"); } + } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.java index 3dd402d068d0c..d3513c336681f 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.java @@ -111,4 +111,5 @@ private static AirbyteMessage makeAirbyteMessageFromTraceMessage(final AirbyteTr private static AirbyteTraceMessage makeAirbyteTraceMessage(final AirbyteTraceMessage.Type traceMessageType) { return new AirbyteTraceMessage().withType(traceMessageType).withEmittedAt((double) System.currentTimeMillis()); } + } From 327137ada1aabcd211ddaf5b0c856b065aa32594 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Tue, 13 Feb 2024 13:22:18 -0800 Subject: [PATCH 7/9] bump pg version --- airbyte-integrations/connectors/source-postgres/metadata.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-postgres/metadata.yaml b/airbyte-integrations/connectors/source-postgres/metadata.yaml index 51c9ab87321b7..a7936f242b136 100644 --- a/airbyte-integrations/connectors/source-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/source-postgres/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 - dockerImageTag: 3.3.9 + dockerImageTag: 3.3.10 dockerRepository: airbyte/source-postgres documentationUrl: https://docs.airbyte.com/integrations/sources/postgres githubIssueLabel: source-postgres From 8555f9e2f19b4c88d8324a64b5a18c8aeadeb089 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Tue, 13 Feb 2024 14:20:42 -0800 Subject: [PATCH 8/9] Update metadata.yaml --- airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml index f9729994caf91..cde2b8488af3c 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml +++ b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: source definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e - dockerImageTag: 1.2.9 + dockerImageTag: 1.2.10 dockerRepository: airbyte/source-mongodb-v2 documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2 githubIssueLabel: source-mongodb-v2 From cd52b98030e184a143d56edd6fcca0bd52448afb Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Tue, 13 Feb 2024 14:38:30 -0800 Subject: [PATCH 9/9] Toggle useLocalCdk and bump versions --- .../connectors/source-mongodb-v2/build.gradle | 4 ++-- airbyte-integrations/connectors/source-mysql/build.gradle | 2 +- airbyte-integrations/connectors/source-mysql/metadata.yaml | 2 +- .../source/mysql/initialsync/MySqlInitialReadUtil.java | 2 ++ airbyte-integrations/connectors/source-postgres/build.gradle | 4 ++-- docs/integrations/sources/mysql.md | 1 + 6 files changed, 9 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle index 53059dfe0feca..b160c88b04b3d 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle +++ b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle @@ -4,9 +4,9 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.19.0' + cdkVersionRequired = '0.20.6' features = ['db-sources', 'datastore-mongo'] - useLocalCdk = true + useLocalCdk = false } application { diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index 11b9d5b4e156f..a4dd5c22b0b59 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -6,7 +6,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.20.5' + cdkVersionRequired = '0.20.6' features = ['db-sources'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index a50e4a1846552..68a3b86807fe0 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.3.6 + dockerImageTag: 3.3.7 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java index 5c3e6516bf390..ff0b7a477e5cf 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java @@ -4,6 +4,7 @@ package io.airbyte.integrations.source.mysql.initialsync; +import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcCursorInvalidMessage; import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.getTableSizeInfoForStreams; import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.prettyPrintConfiguredAirbyteStreamList; import static io.airbyte.integrations.source.mysql.cdc.MysqlCdcStateConstants.MYSQL_CDC_OFFSET; @@ -109,6 +110,7 @@ public static List> getCdcReadIterators(fi savedOffset.isPresent() && mySqlDebeziumStateUtil.savedOffsetStillPresentOnServer(database, savedOffset.get()); if (!savedOffsetStillPresentOnServer) { + AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage()); LOGGER.warn("Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch"); } diff --git a/airbyte-integrations/connectors/source-postgres/build.gradle b/airbyte-integrations/connectors/source-postgres/build.gradle index ae8c79d794a17..d68232e1e409d 100644 --- a/airbyte-integrations/connectors/source-postgres/build.gradle +++ b/airbyte-integrations/connectors/source-postgres/build.gradle @@ -12,9 +12,9 @@ java { } airbyteJavaConnector { - cdkVersionRequired = '0.20.4' + cdkVersionRequired = '0.20.6' features = ['db-sources', 'datastore-postgres'] - useLocalCdk = true + useLocalCdk = false } application { diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 7ad708fe1736c..09117fd7e2807 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -223,6 +223,7 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.3.7 | 2024-02-13 | [35036](https://github.com/airbytehq/airbyte/pull/34751) | Emit analytics message for invalid CDC cursor. | | 3.3.6 | 2024-02-13 | [34869](https://github.com/airbytehq/airbyte/pull/34573) | Don't emit state in SourceStateIterator when there is an underlying stream failure. | | 3.3.5 | 2024-02-12 | [34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in db name | | 3.3.4 | 2024-02-08 | [34750](https://github.com/airbytehq/airbyte/pull/34750) | Adopt CDK 0.19.0 |