Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add airbyte trace utility to emit analytics messages & emit messages for MongoDB, Postgres & MySQL #35036

Merged
merged 18 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.17.2 | 2024-02-07 | [\#35036](https://github.com/airbytehq/airbyte/pull/35036) | Add trace utility to emit analytics messages. |
| 0.17.1 | 2024-02-08 | [\#35027](https://github.com/airbytehq/airbyte/pull/35027) | Make state handling thread safe in async destination framework. |
| 0.17.0 | 2024-02-08 | [\#34502](https://github.com/airbytehq/airbyte/pull/34502) | Enable configuring async destination batch size. |
| 0.16.6 | 2024-02-07 | [\#34892](https://github.com/airbytehq/airbyte/pull/34892) | Improved testcontainers logging and support for unshared containers. |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.airbyte.cdk.db;

import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage;

/**
* Utility class to define constants associated with database source connector analytics events. Make sure to add the analytics event to
* https://www.notion.so/Connector-Analytics-Events-892a79a49852465f8d59a18bd84c36de
*/
public class DbAnalyticsUtils {
public static final String CDC_CURSOR_INVALID_KEY = "db-sources-cdc-cursor-invalid";

public static AirbyteAnalyticsTraceMessage cdcCursorInvalidMessage() {
return new AirbyteAnalyticsTraceMessage().withType(CDC_CURSOR_INVALID_KEY).withValue("1");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.cdk.integrations.base;

import io.airbyte.commons.stream.AirbyteStreamStatusHolder;
import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage;
import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage;
import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage.FailureType;
import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage;
Expand Down Expand Up @@ -50,6 +51,10 @@ public static void emitEstimateTrace(final long byteEstimate,
.withNamespace(streamNamespace))));
}

public static void emitAnalyticsTrace(final AirbyteAnalyticsTraceMessage airbyteAnalyticsTraceMessage) {
emitMessage(makeAnalyticsTraceAirbyteMessage(airbyteAnalyticsTraceMessage));
}

public static void emitErrorTrace(final Throwable e, final String displayMessage, final FailureType failureType) {
emitMessage(makeErrorTraceAirbyteMessage(e, displayMessage, failureType));
}
Expand Down Expand Up @@ -86,6 +91,10 @@ private static AirbyteMessage makeErrorTraceAirbyteMessage(
.withStackTrace(ExceptionUtils.getStackTrace(e))));
}

private static AirbyteMessage makeAnalyticsTraceAirbyteMessage(final AirbyteAnalyticsTraceMessage airbyteAnalyticsTraceMessage) {
return new AirbyteMessage().withType(Type.TRACE).withTrace(new AirbyteTraceMessage().withAnalytics(airbyteAnalyticsTraceMessage));
}

private static AirbyteMessage makeStreamStatusTraceAirbyteMessage(final AirbyteStreamStatusHolder airbyteStreamStatusHolder) {
return makeAirbyteMessageFromTraceMessage(airbyteStreamStatusHolder.toTraceMessage());
}
Expand All @@ -97,5 +106,4 @@ private static AirbyteMessage makeAirbyteMessageFromTraceMessage(final AirbyteTr
private static AirbyteTraceMessage makeAirbyteTraceMessage(final AirbyteTraceMessage.Type traceMessageType) {
return new AirbyteTraceMessage().withType(traceMessageType).withEmittedAt((double) System.currentTimeMillis());
}

}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.17.1
version=0.17.2
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public void setUpOut() {
System.setOut(new PrintStream(outContent, true, StandardCharsets.UTF_8));
}

private void assertJsonNodeIsTraceMessage(JsonNode jsonNode) {
private void assertJsonNodeIsTraceMessage(final JsonNode jsonNode) {
// todo: this check could be better by actually trying to convert the JsonNode to an
// AirbyteTraceMessage instance
Assertions.assertEquals("TRACE", jsonNode.get("type").asText());
Expand All @@ -36,15 +36,15 @@ private void assertJsonNodeIsTraceMessage(JsonNode jsonNode) {
@Test
void testEmitSystemErrorTrace() {
AirbyteTraceMessageUtility.emitSystemErrorTrace(Mockito.mock(RuntimeException.class), "this is a system error");
JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8));
final JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8));
assertJsonNodeIsTraceMessage(outJson);
Assertions.assertEquals("system_error", outJson.get("trace").get("error").get("failure_type").asText());
}

@Test
void testEmitConfigErrorTrace() {
AirbyteTraceMessageUtility.emitConfigErrorTrace(Mockito.mock(RuntimeException.class), "this is a config error");
JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8));
final JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8));
assertJsonNodeIsTraceMessage(outJson);
Assertions.assertEquals("config_error", outJson.get("trace").get("error").get("failure_type").asText());
}
Expand All @@ -58,11 +58,11 @@ void testEmitErrorTrace() {
@Test
void testCorrectStacktraceFormat() {
try {
int x = 1 / 0;
} catch (Exception e) {
final int x = 1 / 0;
} catch (final Exception e) {
AirbyteTraceMessageUtility.emitSystemErrorTrace(e, "you exploded the universe");
}
JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8));
final JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8));
Assertions.assertTrue(outJson.get("trace").get("error").get("stack_trace").asText().contains("\n\tat"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.16.3'
features = ['db-sources']
useLocalCdk = false
useLocalCdk = true
}

airbyteJavaConnector.addCdkDependencies()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

package io.airbyte.integrations.source.mongodb.cdc;

import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcCursorInvalidMessage;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoDatabase;
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.cdk.integrations.debezium.AirbyteDebeziumHandler;
import io.airbyte.cdk.integrations.debezium.internals.RecordWaitTimeUtil;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -110,6 +113,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
optSavedOffset.filter(savedOffset -> mongoDbDebeziumStateUtil.isValidResumeToken(savedOffset, mongoClient)).isPresent();

if (!savedOffsetIsValid) {
AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage());
LOGGER.info("Saved offset is not valid. Airbyte will trigger a full refresh.");
// If the offset in the state is invalid, reset the state to the initial STATE
stateManager.resetState(new MongoDbCdcState(initialDebeziumState, config.getEnforceSchema()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ java {
airbyteJavaConnector {
cdkVersionRequired = '0.16.6'
features = ['db-sources']
useLocalCdk = false
useLocalCdk = true
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

package io.airbyte.integrations.source.postgres.cdc;

import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcCursorInvalidMessage;
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.streamsUnderVacuum;
import static io.airbyte.integrations.source.postgres.PostgresUtils.isDebugMode;
import static io.airbyte.integrations.source.postgres.PostgresUtils.prettyPrintConfiguredAirbyteStreamList;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.cdk.integrations.debezium.AirbyteDebeziumHandler;
import io.airbyte.cdk.integrations.debezium.internals.RelationalDbDebeziumEventConverter;
import io.airbyte.cdk.integrations.debezium.internals.RelationalDbDebeziumPropertiesManager;
Expand Down Expand Up @@ -109,6 +111,7 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin
savedOffset);

if (!savedOffsetAfterReplicationSlotLSN) {
AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage());
LOGGER.warn("Saved offset is before Replication slot's confirmed_flush_lsn, Airbyte will trigger sync from scratch");
} else if (!isDebugMode(sourceConfig) && PostgresUtils.shouldFlushAfterSync(sourceConfig)) {
// We do not want to acknowledge the WAL logs in debug mode.
Expand Down
Loading