-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Postgres source: add integration with data dog #21533
Changes from all commits
aaff3f3
7b8b49e
688d363
00270ec
2f3fcae
88eb858
5139268
2b9ef88
fd01f0c
c1909ad
7bb7e64
0d886cd
7458c52
b0ee119
94a7e83
7714c30
8def0f2
942046d
f58ba5f
0e1a33e
89ac2e3
23bcf93
e4df10a
519873c
e2809f8
22cbf61
c868188
3b22eac
c6d7255
9c8b721
26c3fda
f331d17
d926ac0
212e42c
b0988b8
e60cdf5
0541cd4
bd4ebd4
f2696a1
eb88e58
d2b6162
f0d0242
fd98657
9aec506
2ba8413
11121d4
25f7aa1
fb8cf98
07be478
bfd725b
e8442b5
823205d
31e89d6
211cef0
e5b6bab
d26257d
1dfa6a6
6e46843
d683648
ed62179
ea80d7b
1be6928
ed3d2d1
c466f5a
a83fbe9
988d6d8
47f4bf8
8cd44ff
595e26f
eef635c
74743d2
0870216
a2fad69
790af75
7dfaf34
a48d555
cf010d9
d818947
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
/* | ||
* Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.util; | ||
|
||
import datadog.trace.api.DDTags; | ||
import datadog.trace.api.interceptor.MutableSpan; | ||
import io.opentracing.Span; | ||
import io.opentracing.log.Fields; | ||
import io.opentracing.tag.Tags; | ||
import io.opentracing.util.GlobalTracer; | ||
import java.io.PrintWriter; | ||
import java.io.StringWriter; | ||
import java.util.Map; | ||
|
||
/** | ||
* Collection of utility methods to help with performance tracing. | ||
*/ | ||
public class ApmTraceUtils { | ||
|
||
/** | ||
* String format for the name of tags added to spans. | ||
*/ | ||
public static final String TAG_FORMAT = "airbyte.%s.%s"; | ||
|
||
/** | ||
* Standard prefix for tags added to spans. | ||
*/ | ||
public static final String TAG_PREFIX = "metadata"; | ||
|
||
/** | ||
* Adds all the provided tags to the currently active span, if one exists. <br /> | ||
* All tags added via this method will use the default {@link #TAG_PREFIX} namespace. | ||
* | ||
* @param tags A map of tags to be added to the currently active span. | ||
*/ | ||
public static void addTagsToTrace(final Map<String, Object> tags) { | ||
addTagsToTrace(tags, TAG_PREFIX); | ||
} | ||
|
||
/** | ||
* Adds all provided tags to the currently active span, if one exists, under the provided tag name | ||
* namespace. | ||
* | ||
* @param tags A map of tags to be added to the currently active span. | ||
* @param tagPrefix The prefix to be added to each custom tag name. | ||
*/ | ||
public static void addTagsToTrace(final Map<String, Object> tags, final String tagPrefix) { | ||
addTagsToTrace(GlobalTracer.get().activeSpan(), tags, tagPrefix); | ||
} | ||
|
||
/** | ||
* Adds all the provided tags to the provided span, if one exists. | ||
* | ||
* @param span The {@link Span} that will be associated with the tags. | ||
* @param tags A map of tags to be added to the currently active span. | ||
* @param tagPrefix The prefix to be added to each custom tag name. | ||
*/ | ||
public static void addTagsToTrace(final Span span, final Map<String, Object> tags, final String tagPrefix) { | ||
if (span != null) { | ||
tags.entrySet().forEach(entry -> { | ||
span.setTag(formatTag(entry.getKey(), tagPrefix), entry.getValue().toString()); | ||
}); | ||
} | ||
} | ||
|
||
/** | ||
* Adds an exception to the currently active span, if one exists. | ||
* | ||
* @param t The {@link Throwable} to be added to the currently active span. | ||
*/ | ||
public static void addExceptionToTrace(final Throwable t) { | ||
addExceptionToTrace(GlobalTracer.get().activeSpan(), t); | ||
} | ||
|
||
/** | ||
* Adds an exception to the provided span, if one exists. | ||
* | ||
* @param span The {@link Span} that will be associated with the exception. | ||
* @param t The {@link Throwable} to be added to the provided span. | ||
*/ | ||
public static void addExceptionToTrace(final Span span, final Throwable t) { | ||
if (span != null) { | ||
span.setTag(Tags.ERROR, true); | ||
span.log(Map.of(Fields.ERROR_OBJECT, t)); | ||
} | ||
} | ||
|
||
/** | ||
* Adds all the provided tags to the root span. | ||
* | ||
* @param tags A map of tags to be added to the root span. | ||
*/ | ||
public static void addTagsToRootSpan(final Map<String, Object> tags) { | ||
final Span activeSpan = GlobalTracer.get().activeSpan(); | ||
if (activeSpan instanceof MutableSpan) { | ||
final MutableSpan localRootSpan = ((MutableSpan) activeSpan).getLocalRootSpan(); | ||
tags.entrySet().forEach(entry -> { | ||
localRootSpan.setTag(formatTag(entry.getKey(), TAG_PREFIX), entry.getValue().toString()); | ||
}); | ||
} | ||
} | ||
|
||
/** | ||
* Adds an exception to the root span, if an active one exists. | ||
* | ||
* @param t The {@link Throwable} to be added to the provided span. | ||
*/ | ||
public static void recordErrorOnRootSpan(final Throwable t) { | ||
final Span activeSpan = GlobalTracer.get().activeSpan(); | ||
if (activeSpan != null) { | ||
activeSpan.setTag(Tags.ERROR, true); | ||
activeSpan.log(Map.of(Fields.ERROR_OBJECT, t)); | ||
} | ||
if (activeSpan instanceof MutableSpan) { | ||
final MutableSpan localRootSpan = ((MutableSpan) activeSpan).getLocalRootSpan(); | ||
localRootSpan.setError(true); | ||
localRootSpan.setTag(DDTags.ERROR_MSG, t.getMessage()); | ||
localRootSpan.setTag(DDTags.ERROR_TYPE, t.getClass().getName()); | ||
final StringWriter errorString = new StringWriter(); | ||
t.printStackTrace(new PrintWriter(errorString)); | ||
localRootSpan.setTag(DDTags.ERROR_STACK, errorString.toString()); | ||
} | ||
} | ||
|
||
/** | ||
* Formats the tag key using {@link #TAG_FORMAT} provided by this utility, using the default tag | ||
* prefix {@link #TAG_PREFIX}. | ||
* | ||
* @param tagKey The tag key to format. | ||
* @return The formatted tag key. | ||
*/ | ||
public static String formatTag(final String tagKey) { | ||
return formatTag(tagKey, TAG_PREFIX); | ||
} | ||
|
||
/** | ||
* Formats the tag key using {@link #TAG_FORMAT} provided by this utility with the provided tag | ||
* prefix. | ||
* | ||
* @param tagKey The tag key to format. | ||
* @param tagPrefix The prefix to be added to each custom tag name. | ||
* @return The formatted tag key. | ||
*/ | ||
public static String formatTag(final String tagKey, final String tagPrefix) { | ||
return String.format(TAG_FORMAT, tagPrefix, tagKey); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ | |
import com.google.common.collect.ImmutableMap; | ||
import com.google.common.collect.ImmutableSet; | ||
import io.airbyte.commons.exceptions.ConfigErrorException; | ||
import datadog.trace.api.Trace; | ||
import io.airbyte.commons.features.EnvVariableFeatureFlags; | ||
import io.airbyte.commons.features.FeatureFlags; | ||
import io.airbyte.commons.functional.CheckedConsumer; | ||
|
@@ -57,6 +58,7 @@ | |
import io.airbyte.integrations.source.relationaldb.models.CdcState; | ||
import io.airbyte.integrations.source.relationaldb.models.DbState; | ||
import io.airbyte.integrations.source.relationaldb.state.StateManager; | ||
import io.airbyte.integrations.util.ApmTraceUtils; | ||
import io.airbyte.integrations.util.HostPortResolver; | ||
import io.airbyte.protocol.models.CommonField; | ||
import io.airbyte.protocol.models.v0.AirbyteCatalog; | ||
|
@@ -212,6 +214,7 @@ protected Set<String> getExcludedViews() { | |
} | ||
|
||
@Override | ||
@Trace(operationName = DISCOVER_TRACE_OPERATION_NAME) | ||
public AirbyteCatalog discover(final JsonNode config) throws Exception { | ||
final AirbyteCatalog catalog = super.discover(config); | ||
|
||
|
@@ -281,6 +284,7 @@ List<JsonNode> getReplicationSlot(final JdbcDatabase database, final JsonNode co | |
} | ||
} | ||
|
||
@Trace(operationName = CHECK_TRACE_OPERATION_NAME) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same : do we need this annotation for every method? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The same case, by the way we can appropriately configure it after using and understanding what we really need to trace. |
||
@Override | ||
public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final JsonNode config) | ||
throws Exception { | ||
|
@@ -503,6 +507,7 @@ public static void main(final String[] args) throws Exception { | |
} | ||
|
||
@Override | ||
@Trace(operationName = CHECK_TRACE_OPERATION_NAME) | ||
public AirbyteConnectionStatus check(final JsonNode config) throws Exception { | ||
if (PostgresUtils.isCdc(config)) { | ||
if (config.has(SSL_MODE) && config.get(SSL_MODE).has(MODE)) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that we are using the in-process collector or will we be forwarding metrics to the DD collector pod?