diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 327bf2795170b..8e25ce5abb086 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -166,6 +166,7 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.20.6 | 2024-02-12 | [\#35036](https://github.com/airbytehq/airbyte/pull/35036) | Add trace utility to emit analytics messages. | | 0.20.5 | 2024-02-13 | [\#34869](https://github.com/airbytehq/airbyte/pull/34869) | Don't emit final state in SourceStateIterator there is an underlying stream failure. | | 0.20.4 | 2024-02-12 | [\#35042](https://github.com/airbytehq/airbyte/pull/35042) | Use delegate's isDestinationV2 invocation in SshWrappedDestination. | | 0.20.3 | 2024-02-09 | [\#34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in mysql/mssql database name. | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/db/DbAnalyticsUtils.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/db/DbAnalyticsUtils.java new file mode 100644 index 0000000000000..c10cffa61fe2f --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/db/DbAnalyticsUtils.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.db; + +import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage; + +/** + * Utility class to define constants associated with database source connector analytics events. + * Make sure to add the analytics event to + * https://www.notion.so/Connector-Analytics-Events-892a79a49852465f8d59a18bd84c36de + */ +public class DbAnalyticsUtils { + + public static final String CDC_CURSOR_INVALID_KEY = "db-sources-cdc-cursor-invalid"; + + public static AirbyteAnalyticsTraceMessage cdcCursorInvalidMessage() { + return new AirbyteAnalyticsTraceMessage().withType(CDC_CURSOR_INVALID_KEY).withValue("1"); + } + +} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.java index 943f2cf8ba757..d3513c336681f 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.java @@ -5,12 +5,14 @@ package io.airbyte.cdk.integrations.base; import io.airbyte.commons.stream.AirbyteStreamStatusHolder; +import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage; import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage; import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage.FailureType; import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteMessage.Type; import io.airbyte.protocol.models.v0.AirbyteTraceMessage; +import java.time.Instant; import java.util.function.Consumer; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -50,6 +52,10 @@ public static void emitEstimateTrace(final long byteEstimate, .withNamespace(streamNamespace)))); } + public static void emitAnalyticsTrace(final AirbyteAnalyticsTraceMessage airbyteAnalyticsTraceMessage) { + emitMessage(makeAnalyticsTraceAirbyteMessage(airbyteAnalyticsTraceMessage)); + } + public static void emitErrorTrace(final Throwable e, final String displayMessage, final FailureType failureType) { emitMessage(makeErrorTraceAirbyteMessage(e, displayMessage, failureType)); } @@ -86,6 +92,14 @@ private static AirbyteMessage makeErrorTraceAirbyteMessage( .withStackTrace(ExceptionUtils.getStackTrace(e)))); } + private static AirbyteMessage makeAnalyticsTraceAirbyteMessage(final AirbyteAnalyticsTraceMessage airbyteAnalyticsTraceMessage) { + return new AirbyteMessage().withType(Type.TRACE) + .withTrace(new AirbyteTraceMessage() + .withAnalytics(airbyteAnalyticsTraceMessage) + .withType(AirbyteTraceMessage.Type.ANALYTICS) + .withEmittedAt((double) Instant.now().toEpochMilli())); + } + private static AirbyteMessage makeStreamStatusTraceAirbyteMessage(final AirbyteStreamStatusHolder airbyteStreamStatusHolder) { return makeAirbyteMessageFromTraceMessage(airbyteStreamStatusHolder.toTraceMessage()); } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 09d1c8ab8e97d..7d49962d75532 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.20.5 \ No newline at end of file +version=0.20.6 \ No newline at end of file diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtilityTest.java b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtilityTest.java index f75f7a01ac99a..6ea6492d29603 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtilityTest.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtilityTest.java @@ -26,7 +26,7 @@ public void setUpOut() { System.setOut(new PrintStream(outContent, true, StandardCharsets.UTF_8)); } - private void assertJsonNodeIsTraceMessage(JsonNode jsonNode) { + private void assertJsonNodeIsTraceMessage(final JsonNode jsonNode) { // todo: this check could be better by actually trying to convert the JsonNode to an // AirbyteTraceMessage instance Assertions.assertEquals("TRACE", jsonNode.get("type").asText()); @@ -36,7 +36,7 @@ private void assertJsonNodeIsTraceMessage(JsonNode jsonNode) { @Test void testEmitSystemErrorTrace() { AirbyteTraceMessageUtility.emitSystemErrorTrace(Mockito.mock(RuntimeException.class), "this is a system error"); - JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8)); + final JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8)); assertJsonNodeIsTraceMessage(outJson); Assertions.assertEquals("system_error", outJson.get("trace").get("error").get("failure_type").asText()); } @@ -44,7 +44,7 @@ void testEmitSystemErrorTrace() { @Test void testEmitConfigErrorTrace() { AirbyteTraceMessageUtility.emitConfigErrorTrace(Mockito.mock(RuntimeException.class), "this is a config error"); - JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8)); + final JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8)); assertJsonNodeIsTraceMessage(outJson); Assertions.assertEquals("config_error", outJson.get("trace").get("error").get("failure_type").asText()); } @@ -58,11 +58,11 @@ void testEmitErrorTrace() { @Test void testCorrectStacktraceFormat() { try { - int x = 1 / 0; - } catch (Exception e) { + final int x = 1 / 0; + } catch (final Exception e) { AirbyteTraceMessageUtility.emitSystemErrorTrace(e, "you exploded the universe"); } - JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8)); + final JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8)); Assertions.assertTrue(outJson.get("trace").get("error").get("stack_trace").asText().contains("\n\tat")); } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle index 99bfbb91a6e2e..b160c88b04b3d 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle +++ b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle @@ -4,7 +4,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.19.0' + cdkVersionRequired = '0.20.6' features = ['db-sources', 'datastore-mongo'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml index f9729994caf91..cde2b8488af3c 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml +++ b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: source definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e - dockerImageTag: 1.2.9 + dockerImageTag: 1.2.10 dockerRepository: airbyte/source-mongodb-v2 documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2 githubIssueLabel: source-mongodb-v2 diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java index ab322e37b8162..8b73e6ab3f402 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java @@ -4,10 +4,13 @@ package io.airbyte.integrations.source.mongodb.cdc; +import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcCursorInvalidMessage; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoDatabase; +import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.cdk.integrations.debezium.AirbyteDebeziumHandler; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.AutoCloseableIterator; @@ -112,6 +115,7 @@ public List> createCdcIterators( optSavedOffset.filter(savedOffset -> mongoDbDebeziumStateUtil.isValidResumeToken(savedOffset, mongoClient)).isPresent(); if (!savedOffsetIsValid) { + AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage()); LOGGER.info("Saved offset is not valid. Airbyte will trigger a full refresh."); // If the offset in the state is invalid, reset the state to the initial STATE stateManager.resetState(new MongoDbCdcState(initialDebeziumState, config.getEnforceSchema())); diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index 11b9d5b4e156f..a4dd5c22b0b59 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -6,7 +6,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.20.5' + cdkVersionRequired = '0.20.6' features = ['db-sources'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index a50e4a1846552..68a3b86807fe0 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.3.6 + dockerImageTag: 3.3.7 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java index 5c3e6516bf390..ff0b7a477e5cf 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java @@ -4,6 +4,7 @@ package io.airbyte.integrations.source.mysql.initialsync; +import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcCursorInvalidMessage; import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.getTableSizeInfoForStreams; import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.prettyPrintConfiguredAirbyteStreamList; import static io.airbyte.integrations.source.mysql.cdc.MysqlCdcStateConstants.MYSQL_CDC_OFFSET; @@ -109,6 +110,7 @@ public static List> getCdcReadIterators(fi savedOffset.isPresent() && mySqlDebeziumStateUtil.savedOffsetStillPresentOnServer(database, savedOffset.get()); if (!savedOffsetStillPresentOnServer) { + AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage()); LOGGER.warn("Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch"); } diff --git a/airbyte-integrations/connectors/source-postgres/build.gradle b/airbyte-integrations/connectors/source-postgres/build.gradle index 6c2428fe6248b..d68232e1e409d 100644 --- a/airbyte-integrations/connectors/source-postgres/build.gradle +++ b/airbyte-integrations/connectors/source-postgres/build.gradle @@ -12,7 +12,7 @@ java { } airbyteJavaConnector { - cdkVersionRequired = '0.20.4' + cdkVersionRequired = '0.20.6' features = ['db-sources', 'datastore-postgres'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-postgres/metadata.yaml b/airbyte-integrations/connectors/source-postgres/metadata.yaml index 51c9ab87321b7..a7936f242b136 100644 --- a/airbyte-integrations/connectors/source-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/source-postgres/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 - dockerImageTag: 3.3.9 + dockerImageTag: 3.3.10 dockerRepository: airbyte/source-postgres documentationUrl: https://docs.airbyte.com/integrations/sources/postgres githubIssueLabel: source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java index a579000bd4c5e..74c1c28f6b7cb 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java @@ -4,6 +4,7 @@ package io.airbyte.integrations.source.postgres.cdc; +import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcCursorInvalidMessage; import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.streamsUnderVacuum; import static io.airbyte.integrations.source.postgres.PostgresUtils.isDebugMode; import static io.airbyte.integrations.source.postgres.PostgresUtils.prettyPrintConfiguredAirbyteStreamList; @@ -11,6 +12,7 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.db.jdbc.JdbcUtils; +import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.cdk.integrations.debezium.AirbyteDebeziumHandler; import io.airbyte.cdk.integrations.debezium.internals.RelationalDbDebeziumEventConverter; import io.airbyte.cdk.integrations.debezium.internals.RelationalDbDebeziumPropertiesManager; @@ -109,6 +111,7 @@ public static List> cdcCtidIteratorsCombin savedOffset); if (!savedOffsetAfterReplicationSlotLSN) { + AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage()); LOGGER.warn("Saved offset is before Replication slot's confirmed_flush_lsn, Airbyte will trigger sync from scratch"); } else if (!isDebugMode(sourceConfig) && PostgresUtils.shouldFlushAfterSync(sourceConfig)) { // We do not want to acknowledge the WAL logs in debug mode. diff --git a/docs/integrations/sources/mongodb-v2.md b/docs/integrations/sources/mongodb-v2.md index 72873c05fc0be..4515d39997fb0 100644 --- a/docs/integrations/sources/mongodb-v2.md +++ b/docs/integrations/sources/mongodb-v2.md @@ -214,6 +214,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------| +| 1.2.10 | 2024-02-13 | [35036](https://github.com/airbytehq/airbyte/pull/34751) | Emit analytics message for invalid CDC cursor. | | 1.2.9 | 2024-02-13 | [35114](https://github.com/airbytehq/airbyte/pull/35114) | Extend subsequent cdc record wait time to the duration of initial. Bug Fixes | | 1.2.8 | 2024-02-08 | [34748](https://github.com/airbytehq/airbyte/pull/34748) | Adopt CDK 0.19.0 | | 1.2.7 | 2024-02-01 | [34759](https://github.com/airbytehq/airbyte/pull/34759) | Fail sync if initial snapshot for any stream fails. | diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 7ad708fe1736c..09117fd7e2807 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -223,6 +223,7 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.3.7 | 2024-02-13 | [35036](https://github.com/airbytehq/airbyte/pull/34751) | Emit analytics message for invalid CDC cursor. | | 3.3.6 | 2024-02-13 | [34869](https://github.com/airbytehq/airbyte/pull/34573) | Don't emit state in SourceStateIterator when there is an underlying stream failure. | | 3.3.5 | 2024-02-12 | [34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in db name | | 3.3.4 | 2024-02-08 | [34750](https://github.com/airbytehq/airbyte/pull/34750) | Adopt CDK 0.19.0 | diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index 20717f097f30c..4c8649e7c5bf4 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -292,8 +292,9 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | Version | Date | Pull Request | Subject | |---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 3.3.9 | 2024-02-13 | [35224](https://github.com/airbytehq/airbyte/pull/35224) | Adopt CDK 0.20.4 | -| 3.3.8 | 2024-02-08 | [34751](https://github.com/airbytehq/airbyte/pull/34751) | Adopt CDK 0.19.0 | +| 3.3.10 | 2024-02-13 | [35036](https://github.com/airbytehq/airbyte/pull/34751) | Emit analytics message for invalid CDC cursor. | +| 3.3.9 | 2024-02-13 | [35224](https://github.com/airbytehq/airbyte/pull/35224) | Adopt CDK 0.20.4 | +| 3.3.8 | 2024-02-08 | [34751](https://github.com/airbytehq/airbyte/pull/34751) | Adopt CDK 0.19.0 | | 3.3.7 | 2024-02-08 | [34781](https://github.com/airbytehq/airbyte/pull/34781) | Add a setting in the setup page to advance the LSN. | | 3.3.6 | 2024-02-07 | [34892](https://github.com/airbytehq/airbyte/pull/34892) | Adopt CDK v0.16.6 | | 3.3.5 | 2024-02-07 | [34948](https://github.com/airbytehq/airbyte/pull/34948) | Adopt CDK v0.16.5 |