Skip to content

Commit a4b8edf

Browse files
authored
Add detailed sentry tracing for JDBC destination stream consumer (#9898)
* Refactor airbyte sentry * Add more sentry monitoring for jdbc destination * Profile operations from failure tracking consumer * Remove redundant method call * Update operation names * Trace snowflake starting process * Trace snowflake copying step * Move tracing to sql operation
1 parent 1096a5a commit a4b8edf

File tree

10 files changed

+177
-84
lines changed

10 files changed

+177
-84
lines changed

airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/AirbyteMessageConsumer.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66

77
import io.airbyte.commons.concurrency.VoidCallable;
88
import io.airbyte.commons.functional.CheckedConsumer;
9+
import io.airbyte.integrations.base.sentry.AirbyteSentry;
910
import io.airbyte.protocol.models.AirbyteMessage;
11+
import org.elasticsearch.common.collect.Map;
1012

1113
/**
1214
* Interface for the destination's consumption of incoming records wrapped in an
@@ -46,7 +48,8 @@ static AirbyteMessageConsumer appendOnClose(final AirbyteMessageConsumer consume
4648

4749
@Override
4850
public void start() throws Exception {
49-
consumer.start();
51+
AirbyteSentry.executeWithTracing("StartConsumer", consumer::start,
52+
Map.of("consumerImpl", "appendOnClose"));
5053
}
5154

5255
@Override
@@ -56,8 +59,10 @@ public void accept(final AirbyteMessage message) throws Exception {
5659

5760
@Override
5861
public void close() throws Exception {
59-
consumer.close();
60-
voidCallable.call();
62+
AirbyteSentry.executeWithTracing("CloseConsumer", () -> {
63+
consumer.close();
64+
voidCallable.call();
65+
}, Map.of("consumerImpl", "appendOnClose"));
6166
}
6267

6368
};

airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/FailureTrackingAirbyteMessageConsumer.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
package io.airbyte.integrations.base;
66

7+
import io.airbyte.integrations.base.sentry.AirbyteSentry;
78
import io.airbyte.protocol.models.AirbyteMessage;
9+
import java.util.Map;
810
import org.slf4j.Logger;
911
import org.slf4j.LoggerFactory;
1012

@@ -31,7 +33,8 @@ public abstract class FailureTrackingAirbyteMessageConsumer implements AirbyteMe
3133
@Override
3234
public void start() throws Exception {
3335
try {
34-
startTracked();
36+
AirbyteSentry.executeWithTracing("StartConsumer", this::startTracked,
37+
Map.of("consumerImpl", FailureTrackingAirbyteMessageConsumer.class.getSimpleName()));
3538
} catch (final Exception e) {
3639
hasFailed = true;
3740
throw e;
@@ -59,7 +62,8 @@ public void close() throws Exception {
5962
} else {
6063
LOGGER.info("Airbyte message consumer: succeeded.");
6164
}
62-
close(hasFailed);
65+
AirbyteSentry.executeWithTracing("CloseConsumer", () -> close(hasFailed),
66+
Map.of("consumerImpl", FailureTrackingAirbyteMessageConsumer.class.getSimpleName()));
6367
}
6468

6569
}

airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java

+5-10
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.airbyte.commons.io.IOs;
1111
import io.airbyte.commons.json.Jsons;
1212
import io.airbyte.commons.util.AutoCloseableIterator;
13+
import io.airbyte.integrations.base.sentry.AirbyteSentry;
1314
import io.airbyte.protocol.models.AirbyteConnectionStatus;
1415
import io.airbyte.protocol.models.AirbyteMessage;
1516
import io.airbyte.protocol.models.AirbyteMessage.Type;
@@ -114,14 +115,8 @@ public void runInternal(final ITransaction transaction, final IntegrationConfig
114115
validateConfig(integration.spec().getConnectionSpecification(), config, "CHECK");
115116
} catch (final Exception e) {
116117
// if validation fails don't throw an exception, return a failed connection check message
117-
outputRecordCollector
118-
.accept(
119-
new AirbyteMessage()
120-
.withType(Type.CONNECTION_STATUS)
121-
.withConnectionStatus(
122-
new AirbyteConnectionStatus()
123-
.withStatus(AirbyteConnectionStatus.Status.FAILED)
124-
.withMessage(e.getMessage())));
118+
outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(
119+
new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.FAILED).withMessage(e.getMessage())));
125120
}
126121

127122
outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(integration.check(config)));
@@ -141,7 +136,7 @@ public void runInternal(final ITransaction transaction, final IntegrationConfig
141136
final Optional<JsonNode> stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig);
142137
final AutoCloseableIterator<AirbyteMessage> messageIterator = source.read(config, catalog, stateOptional.orElse(null));
143138
try (messageIterator) {
144-
messageIterator.forEachRemaining(outputRecordCollector::accept);
139+
AirbyteSentry.executeWithTracing("ReadSource", () -> messageIterator.forEachRemaining(outputRecordCollector::accept));
145140
}
146141
}
147142
// destination only
@@ -150,7 +145,7 @@ public void runInternal(final ITransaction transaction, final IntegrationConfig
150145
validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE");
151146
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
152147
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector);
153-
consumeWriteStream(consumer);
148+
AirbyteSentry.executeWithTracing("WriteDestination", () -> consumeWriteStream(consumer));
154149
}
155150
default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand());
156151
}

airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/sentry/AirbyteSentry.java

+69-19
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,49 @@
77
import io.sentry.ISpan;
88
import io.sentry.Sentry;
99
import io.sentry.SpanStatus;
10-
import java.util.concurrent.Callable;
10+
import java.util.Collections;
11+
import java.util.Map;
12+
import org.apache.logging.log4j.util.Strings;
1113

1214
public class AirbyteSentry {
1315

16+
private static final String DEFAULT_ROOT_TRANSACTION = "ROOT";
17+
private static final String DEFAULT_UNKNOWN_OPERATION = "ANONYMOUS";
18+
1419
@FunctionalInterface
15-
public interface ThrowingRunnable {
20+
public interface ThrowingRunnable<E extends Exception> {
1621

17-
void call() throws Exception;
22+
void call() throws E;
1823

1924
}
2025

21-
public static void runWithSpan(final String operation, final ThrowingRunnable command) throws Exception {
22-
final ISpan span = Sentry.getSpan();
23-
final ISpan childSpan;
24-
if (span == null) {
25-
childSpan = Sentry.startTransaction("ROOT", operation);
26-
} else {
27-
childSpan = span.startChild(operation);
28-
}
26+
@FunctionalInterface
27+
public interface ThrowingCallable<T, E extends Exception> {
28+
29+
T call() throws E;
30+
31+
}
32+
33+
/**
34+
* Run an operation and profile it with Sentry. The operation will run in a span under the current
35+
* Sentry transaction. If no transaction exists, a default one will be created.
36+
*/
37+
public static <E extends Exception> void executeWithTracing(final String name, final ThrowingRunnable<E> command) throws E {
38+
executeWithTracing(name, command, Collections.emptyMap());
39+
}
40+
41+
/**
42+
* Run an operation and profile it with Sentry. The operation will run in a span under the current
43+
* Sentry transaction. If no transaction exists, a default one will be created.
44+
*
45+
* @param metadata Extra data about this operation. For example: { "stream": "table1",
46+
* "recordCount": 1000 }.
47+
*/
48+
public static <E extends Exception> void executeWithTracing(final String name,
49+
final ThrowingRunnable<E> command,
50+
final Map<String, Object> metadata)
51+
throws E {
52+
final ISpan childSpan = createChildSpan(Sentry.getSpan(), name, metadata);
2953
try {
3054
command.call();
3155
childSpan.finish(SpanStatus.OK);
@@ -38,14 +62,26 @@ public static void runWithSpan(final String operation, final ThrowingRunnable co
3862
}
3963
}
4064

41-
public static <T> T runWithSpan(final String operation, final Callable<T> command) throws Exception {
42-
final ISpan span = Sentry.getSpan();
43-
final ISpan childSpan;
44-
if (span == null) {
45-
childSpan = Sentry.startTransaction("ROOT", operation);
46-
} else {
47-
childSpan = span.startChild(operation);
48-
}
65+
/**
66+
* Run an operation and profile it with Sentry. The operation will run in a span under the current
67+
* Sentry transaction. If no transaction exists, a default one will be created.
68+
*/
69+
public static <T, E extends Exception> T queryWithTracing(final String name, final ThrowingCallable<T, E> command) throws E {
70+
return queryWithTracing(name, command, Collections.emptyMap());
71+
}
72+
73+
/**
74+
* Run an operation and profile it with Sentry. The operation will run in a span under the current
75+
* Sentry transaction. If no transaction exists, a default one will be created.
76+
*
77+
* @param metadata Extra data about this operation. For example: { "stream": "table1",
78+
* "recordCount": 1000 }.
79+
*/
80+
public static <T, E extends Exception> T queryWithTracing(final String name,
81+
final ThrowingCallable<T, E> command,
82+
final Map<String, Object> metadata)
83+
throws E {
84+
final ISpan childSpan = createChildSpan(Sentry.getSpan(), name, metadata);
4985
try {
5086
final T result = command.call();
5187
childSpan.finish(SpanStatus.OK);
@@ -59,4 +95,18 @@ public static <T> T runWithSpan(final String operation, final Callable<T> comman
5995
}
6096
}
6197

98+
private static ISpan createChildSpan(final ISpan currentSpan, final String operationName, final Map<String, Object> metadata) {
99+
final String name = Strings.isBlank(operationName) ? DEFAULT_UNKNOWN_OPERATION : operationName;
100+
final ISpan childSpan;
101+
if (currentSpan == null) {
102+
childSpan = Sentry.startTransaction(DEFAULT_ROOT_TRANSACTION, operationName);
103+
} else {
104+
childSpan = currentSpan.startChild(operationName);
105+
}
106+
if (metadata != null && !metadata.isEmpty()) {
107+
metadata.forEach(childSpan::setData);
108+
}
109+
return childSpan;
110+
}
111+
62112
}

airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import io.airbyte.integrations.base.AirbyteMessageConsumer;
1515
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
1616
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
17+
import io.airbyte.integrations.base.sentry.AirbyteSentry;
1718
import io.airbyte.protocol.models.AirbyteMessage;
1819
import io.airbyte.protocol.models.AirbyteMessage.Type;
1920
import io.airbyte.protocol.models.AirbyteRecordMessage;
@@ -146,10 +147,12 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {
146147
// TODO use a more efficient way to compute bytes that doesn't require double serialization (records
147148
// are serialized again when writing to
148149
// the destination
149-
long messageSizeInBytes = ByteUtils.getSizeInBytesForUTF8CharSet(Jsons.serialize(recordMessage.getData()));
150+
final long messageSizeInBytes = ByteUtils.getSizeInBytesForUTF8CharSet(Jsons.serialize(recordMessage.getData()));
150151
if (bufferSizeInBytes + messageSizeInBytes > maxQueueSizeInBytes) {
151152
LOGGER.info("Flushing buffer...");
152-
flushQueueToDestination();
153+
AirbyteSentry.executeWithTracing("FlushBuffer",
154+
this::flushQueueToDestination,
155+
Map.of("stream", stream.getName(), "namespace", stream.getNamespace(), "bufferSizeInBytes", bufferSizeInBytes));
153156
bufferSizeInBytes = 0;
154157
}
155158

airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.airbyte.integrations.BaseConnector;
1212
import io.airbyte.integrations.base.AirbyteMessageConsumer;
1313
import io.airbyte.integrations.base.Destination;
14+
import io.airbyte.integrations.base.sentry.AirbyteSentry;
1415
import io.airbyte.integrations.destination.NamingConventionTransformer;
1516
import io.airbyte.protocol.models.AirbyteConnectionStatus;
1617
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
@@ -54,7 +55,8 @@ public AirbyteConnectionStatus check(final JsonNode config) {
5455

5556
try (final JdbcDatabase database = getDatabase(config)) {
5657
final String outputSchema = namingResolver.getIdentifier(config.get("schema").asText());
57-
attemptSQLCreateAndDropTableOperations(outputSchema, database, namingResolver, sqlOperations);
58+
AirbyteSentry.executeWithTracing("CreateAndDropTable", () ->
59+
attemptSQLCreateAndDropTableOperations(outputSchema, database, namingResolver, sqlOperations));
5860
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
5961
} catch (final Exception e) {
6062
LOGGER.error("Exception while checking connection: ", e);

airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcSqlOperations.java

+20-6
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.airbyte.commons.json.Jsons;
99
import io.airbyte.db.jdbc.JdbcDatabase;
1010
import io.airbyte.integrations.base.JavaBaseConstants;
11+
import io.airbyte.integrations.base.sentry.AirbyteSentry;
1112
import io.airbyte.protocol.models.AirbyteRecordMessage;
1213
import java.io.File;
1314
import java.io.PrintWriter;
@@ -16,6 +17,7 @@
1617
import java.sql.Timestamp;
1718
import java.time.Instant;
1819
import java.util.List;
20+
import java.util.Map;
1921
import java.util.UUID;
2022
import org.apache.commons.csv.CSVFormat;
2123
import org.apache.commons.csv.CSVPrinter;
@@ -31,7 +33,9 @@ public abstract class JdbcSqlOperations implements SqlOperations {
3133
@Override
3234
public void createSchemaIfNotExists(final JdbcDatabase database, final String schemaName) throws Exception {
3335
if (!isSchemaExists(database, schemaName)) {
34-
database.execute(createSchemaQuery(schemaName));;
36+
AirbyteSentry.executeWithTracing("CreateSchema",
37+
() -> database.execute(createSchemaQuery(schemaName)),
38+
Map.of("schema", schemaName));
3539
}
3640
}
3741

@@ -41,7 +45,9 @@ private String createSchemaQuery(final String schemaName) {
4145

4246
@Override
4347
public void createTableIfNotExists(final JdbcDatabase database, final String schemaName, final String tableName) throws SQLException {
44-
database.execute(createTableQuery(database, schemaName, tableName));
48+
AirbyteSentry.executeWithTracing("CreateTableIfNotExists",
49+
() -> database.execute(createTableQuery(database, schemaName, tableName)),
50+
Map.of("schema", schemaName, "table", tableName));
4551
}
4652

4753
@Override
@@ -96,12 +102,16 @@ public void executeTransaction(final JdbcDatabase database, final List<String> q
96102
appendedQueries.append(query);
97103
}
98104
appendedQueries.append("COMMIT;");
99-
database.execute(appendedQueries.toString());
105+
AirbyteSentry.executeWithTracing("ExecuteTransactions",
106+
() -> database.execute(appendedQueries.toString()),
107+
Map.of("queries", queries));
100108
}
101109

102110
@Override
103111
public void dropTableIfExists(final JdbcDatabase database, final String schemaName, final String tableName) throws SQLException {
104-
database.execute(dropTableIfExistsQuery(schemaName, tableName));
112+
AirbyteSentry.executeWithTracing("DropTableIfExists",
113+
() -> database.execute(dropTableIfExistsQuery(schemaName, tableName)),
114+
Map.of("schema", schemaName, "table", tableName));
105115
}
106116

107117
private String dropTableIfExistsQuery(final String schemaName, final String tableName) {
@@ -124,8 +134,12 @@ public final void insertRecords(final JdbcDatabase database,
124134
final String schemaName,
125135
final String tableName)
126136
throws Exception {
127-
records.forEach(airbyteRecordMessage -> getDataAdapter().adapt(airbyteRecordMessage.getData()));
128-
insertRecordsInternal(database, records, schemaName, tableName);
137+
AirbyteSentry.executeWithTracing("InsertRecords",
138+
() -> {
139+
records.forEach(airbyteRecordMessage -> getDataAdapter().adapt(airbyteRecordMessage.getData()));
140+
insertRecordsInternal(database, records, schemaName, tableName);
141+
},
142+
Map.of("schema", schemaName, "table", tableName, "recordCount", records.size()));
129143
}
130144

131145
protected abstract void insertRecordsInternal(JdbcDatabase database,

0 commit comments

Comments
 (0)