Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres source: add integration with data dog #21533

Merged
merged 78 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
aaff3f3
Source postgres: add dd for env running locally
sashaNeshcheret Jan 13, 2023
7b8b49e
Source postgres: add dd for running in cloud
sashaNeshcheret Jan 18, 2023
688d363
auto-bump connector version
octavia-squidington-iii Jan 20, 2023
00270ec
Source postgres: bump postgres strict-encrypt version
sashaNeshcheret Jan 20, 2023
2f3fcae
Merge remote-tracking branch 'origin/omneshcheret/20423-add-dd-to-pos…
sashaNeshcheret Jan 20, 2023
88eb858
Source postgres: filter datadog agent env variables just for postgres…
sashaNeshcheret Jan 23, 2023
5139268
Merge remote-tracking branch 'origin/master' into omneshcheret/20423-…
sashaNeshcheret Jan 23, 2023
2b9ef88
Merge remote-tracking branch 'origin/master' into omneshcheret/20423-…
sashaNeshcheret Jan 24, 2023
fd01f0c
Merge remote-tracking branch 'origin/master' into omneshcheret/20423-…
sashaNeshcheret Jan 24, 2023
c1909ad
Merge branch 'master' into omneshcheret/20423-add-dd-to-postgres-source
sashaNeshcheret Jan 24, 2023
7bb7e64
Source postgres: format
sashaNeshcheret Jan 24, 2023
0d886cd
Merge remote-tracking branch 'origin/omneshcheret/20423-add-dd-to-pos…
sashaNeshcheret Jan 24, 2023
7458c52
Source postgres: clean code
sashaNeshcheret Jan 30, 2023
b0ee119
Source postgres: pass java opts for all connectors
sashaNeshcheret Jan 30, 2023
94a7e83
Source postgres: temp removing dd agent from image
sashaNeshcheret Jan 31, 2023
7714c30
Source postgres: add dd agent to image
sashaNeshcheret Jan 31, 2023
8def0f2
Source postgres: temp revert adding dd env variable
sashaNeshcheret Jan 31, 2023
942046d
Source postgres: temp hardcoded dd env variable
sashaNeshcheret Jan 31, 2023
f58ba5f
Source postgres: temp hardcoded dd env variable
sashaNeshcheret Jan 31, 2023
0e1a33e
Source postgres: temp hardcoded dd env variable
sashaNeshcheret Feb 1, 2023
89ac2e3
Source postgres: temp hardcoded dd env variable
sashaNeshcheret Feb 1, 2023
23bcf93
Source postgres: temp hardcoded dd env variable
sashaNeshcheret Feb 1, 2023
e4df10a
Source postgres: temp hardcoded dd env variable
sashaNeshcheret Feb 1, 2023
519873c
Source postgres: temp hardcoded dd env variable
sashaNeshcheret Feb 1, 2023
e2809f8
Source postgres: temp hardcoded dd env variable
sashaNeshcheret Feb 1, 2023
22cbf61
Source postgres: temp removing hardcoded dd env variable
sashaNeshcheret Feb 2, 2023
c868188
Source postgres: temp added hardcoded dd env variable
sashaNeshcheret Feb 6, 2023
3b22eac
Source postgres: temp added hardcoded dd env variable
sashaNeshcheret Feb 7, 2023
c6d7255
Source postgres: temp added hardcoded dd env variable
sashaNeshcheret Feb 7, 2023
9c8b721
Source postgres: rename to java_opts and pass data dog host
sashaNeshcheret Feb 7, 2023
26c3fda
Source postgres: add vars to kube pods
sashaNeshcheret Feb 8, 2023
f331d17
Source postgres: add vars to kube pods
sashaNeshcheret Feb 8, 2023
d926ac0
Source postgres: add vars to kube pods
sashaNeshcheret Feb 9, 2023
212e42c
Source postgres: add Trace to more methods
sashaNeshcheret Feb 10, 2023
b0988b8
Source postgres: add Trace to more methods
sashaNeshcheret Feb 10, 2023
e60cdf5
Source postgres: add Trace to more methods
sashaNeshcheret Feb 10, 2023
0541cd4
Source postgres: temp reverting service name removing
sashaNeshcheret Feb 12, 2023
bd4ebd4
Source postgres: temp reverting service name removing
sashaNeshcheret Feb 12, 2023
f2696a1
Source postgres: temp reverting service name removing
sashaNeshcheret Feb 14, 2023
eb88e58
Source postgres: temp adding trace to integration runner
sashaNeshcheret Feb 15, 2023
d2b6162
Source postgres: temp adding trace to integration runner
sashaNeshcheret Feb 15, 2023
f0d0242
Merge remote-tracking branch 'origin/master' into omneshcheret/20423-…
sashaNeshcheret Feb 23, 2023
fd98657
Source postgres: bump postgres source dd version
sashaNeshcheret Feb 23, 2023
9aec506
Merge remote-tracking branch 'origin/master' into omneshcheret/20423-…
sashaNeshcheret Feb 28, 2023
2ba8413
Source postgres: bump postgres source dd version
sashaNeshcheret Feb 28, 2023
11121d4
Source postgres: revert temp changes
sashaNeshcheret Mar 1, 2023
25f7aa1
Merge remote-tracking branch 'origin/master' into omneshcheret/20423-…
sashaNeshcheret Mar 1, 2023
fb8cf98
Source postgres: merge with master
sashaNeshcheret Mar 1, 2023
07be478
Automated Commit - Formatting Changes
sashaNeshcheret Mar 1, 2023
bfd725b
Source postgres: move dd java agent to base java
sashaNeshcheret Mar 2, 2023
e8442b5
Source postgres: move dd java agent to base java
sashaNeshcheret Mar 2, 2023
823205d
Merge remote-tracking branch 'origin/master' into omneshcheret/20423-…
sashaNeshcheret Mar 6, 2023
31e89d6
Merge branch 'master' into omneshcheret/20423-add-dd-to-postgres-source
sashaNeshcheret Mar 6, 2023
211cef0
Source postgres: clean up
sashaNeshcheret Mar 6, 2023
e5b6bab
Merge remote-tracking branch 'origin/omneshcheret/20423-add-dd-to-pos…
sashaNeshcheret Mar 6, 2023
d26257d
Source postgres: clean up
sashaNeshcheret Mar 6, 2023
1dfa6a6
Merge branch 'master' into omneshcheret/20423-add-dd-to-postgres-source
sashaNeshcheret Mar 6, 2023
6e46843
Automated Change
sashaNeshcheret Mar 6, 2023
d683648
Source postgres: clean up
sashaNeshcheret Mar 8, 2023
ed62179
Merge remote-tracking branch 'origin/omneshcheret/20423-add-dd-to-pos…
sashaNeshcheret Mar 8, 2023
ea80d7b
Merge remote-tracking branch 'origin/master' into omneshcheret/20423-…
sashaNeshcheret Mar 10, 2023
1be6928
Source postgres: bump version
sashaNeshcheret Mar 10, 2023
ed3d2d1
Merge remote-tracking branch 'origin/master' into omneshcheret/20423-…
sashaNeshcheret Mar 13, 2023
c466f5a
Source postgres: bump version for test
sashaNeshcheret Mar 13, 2023
a83fbe9
Merge remote-tracking branch 'origin/master' into omneshcheret/20423-…
sashaNeshcheret Mar 16, 2023
988d6d8
Source postgres: temp bump version
sashaNeshcheret Mar 16, 2023
47f4bf8
Source postgres: bump version
sashaNeshcheret Mar 17, 2023
8cd44ff
Merge remote-tracking branch 'origin/master' into omneshcheret/20423-…
sashaNeshcheret Mar 17, 2023
595e26f
Automated Change
sashaNeshcheret Mar 17, 2023
eef635c
Merge branch 'master' into omneshcheret/20423-add-dd-to-postgres-source
sashaNeshcheret Mar 17, 2023
74743d2
Merge remote-tracking branch 'origin/master' into omneshcheret/20423-…
sashaNeshcheret Mar 20, 2023
0870216
Merge remote-tracking branch 'origin/omneshcheret/20423-add-dd-to-pos…
sashaNeshcheret Mar 20, 2023
a2fad69
Merge branch 'master' into omneshcheret/20423-add-dd-to-postgres-source
sashaNeshcheret Mar 20, 2023
790af75
Merge branch 'master' into omneshcheret/20423-add-dd-to-postgres-source
sashaNeshcheret Mar 21, 2023
7dfaf34
Merge remote-tracking branch 'origin/master' into omneshcheret/20423-…
sashaNeshcheret Mar 21, 2023
a48d555
Source postgres: bump version
sashaNeshcheret Mar 21, 2023
cf010d9
Merge remote-tracking branch 'origin/omneshcheret/20423-add-dd-to-pos…
sashaNeshcheret Mar 21, 2023
d818947
auto-bump connector version
octavia-squidington-iii Mar 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1554,7 +1554,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 2.0.4
dockerImageTag: 2.0.5
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11882,7 +11882,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:2.0.4"
- dockerImage: "airbyte/source-postgres:2.0.5"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
5 changes: 4 additions & 1 deletion airbyte-integrations/bases/base-java/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ RUN yum install -y tar openssl && yum clean all

WORKDIR /airbyte

# Add the Datadog Java APM agent
ADD https://dtdg.co/latest-java-tracer dd-java-agent.jar
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that we are using the in-process collector or will we be forwarding metrics to the DD collector pod?


COPY javabase.sh .

# airbyte base commands
Expand All @@ -18,5 +21,5 @@ ENV AIRBYTE_WRITE_CMD "/airbyte/javabase.sh --write"
ENV AIRBYTE_ENTRYPOINT "/airbyte/base.sh"
ENTRYPOINT ["/airbyte/base.sh"]

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/integration-base-java
1 change: 1 addition & 0 deletions airbyte-integrations/bases/base-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {
implementation libs.jackson.annotations
implementation libs.connectors.testcontainers
implementation libs.connectors.testcontainers.jdbc
implementation libs.bundles.datadog

implementation files(project(':airbyte-integrations:bases:base').airbyteDocker.outputs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import datadog.trace.api.Trace;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions.Procedure;
import io.airbyte.commons.string.Strings;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.integrations.util.ApmTraceUtils;
import io.airbyte.integrations.util.ConnectorExceptionUtil;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteMessage;
Expand Down Expand Up @@ -92,6 +94,7 @@ public IntegrationRunner(final Source source) {
validator = jsonSchemaValidator;
}

@Trace(operationName = "RUN_OPERATION")
public void run(final String[] args) throws Exception {
final IntegrationConfig parsed = cliParser.parse(args);
try {
Expand Down Expand Up @@ -155,6 +158,7 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
// to
// find the root exception that corresponds to a configuration error. If that does not exist, we
// just return the original exception.
ApmTraceUtils.addExceptionToTrace(e);
final Throwable rootThrowable = ConnectorExceptionUtil.getRootConfigError(e);
final String displayMessage = ConnectorExceptionUtil.getDisplayMessage(rootThrowable);
// If the source connector throws a config error, a trace message with the relevant message should
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.util;

import datadog.trace.api.DDTags;
import datadog.trace.api.interceptor.MutableSpan;
import io.opentracing.Span;
import io.opentracing.log.Fields;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Map;

/**
* Collection of utility methods to help with performance tracing.
*/
public class ApmTraceUtils {

/**
* String format for the name of tags added to spans.
*/
public static final String TAG_FORMAT = "airbyte.%s.%s";

/**
* Standard prefix for tags added to spans.
*/
public static final String TAG_PREFIX = "metadata";

/**
* Adds all the provided tags to the currently active span, if one exists. <br />
* All tags added via this method will use the default {@link #TAG_PREFIX} namespace.
*
* @param tags A map of tags to be added to the currently active span.
*/
public static void addTagsToTrace(final Map<String, Object> tags) {
addTagsToTrace(tags, TAG_PREFIX);
}

/**
* Adds all provided tags to the currently active span, if one exists, under the provided tag name
* namespace.
*
* @param tags A map of tags to be added to the currently active span.
* @param tagPrefix The prefix to be added to each custom tag name.
*/
public static void addTagsToTrace(final Map<String, Object> tags, final String tagPrefix) {
addTagsToTrace(GlobalTracer.get().activeSpan(), tags, tagPrefix);
}

/**
* Adds all the provided tags to the provided span, if one exists.
*
* @param span The {@link Span} that will be associated with the tags.
* @param tags A map of tags to be added to the currently active span.
* @param tagPrefix The prefix to be added to each custom tag name.
*/
public static void addTagsToTrace(final Span span, final Map<String, Object> tags, final String tagPrefix) {
if (span != null) {
tags.entrySet().forEach(entry -> {
span.setTag(formatTag(entry.getKey(), tagPrefix), entry.getValue().toString());
});
}
}

/**
* Adds an exception to the currently active span, if one exists.
*
* @param t The {@link Throwable} to be added to the currently active span.
*/
public static void addExceptionToTrace(final Throwable t) {
addExceptionToTrace(GlobalTracer.get().activeSpan(), t);
}

/**
* Adds an exception to the provided span, if one exists.
*
* @param span The {@link Span} that will be associated with the exception.
* @param t The {@link Throwable} to be added to the provided span.
*/
public static void addExceptionToTrace(final Span span, final Throwable t) {
if (span != null) {
span.setTag(Tags.ERROR, true);
span.log(Map.of(Fields.ERROR_OBJECT, t));
}
}

/**
* Adds all the provided tags to the root span.
*
* @param tags A map of tags to be added to the root span.
*/
public static void addTagsToRootSpan(final Map<String, Object> tags) {
final Span activeSpan = GlobalTracer.get().activeSpan();
if (activeSpan instanceof MutableSpan) {
final MutableSpan localRootSpan = ((MutableSpan) activeSpan).getLocalRootSpan();
tags.entrySet().forEach(entry -> {
localRootSpan.setTag(formatTag(entry.getKey(), TAG_PREFIX), entry.getValue().toString());
});
}
}

/**
* Adds an exception to the root span, if an active one exists.
*
* @param t The {@link Throwable} to be added to the provided span.
*/
public static void recordErrorOnRootSpan(final Throwable t) {
final Span activeSpan = GlobalTracer.get().activeSpan();
if (activeSpan != null) {
activeSpan.setTag(Tags.ERROR, true);
activeSpan.log(Map.of(Fields.ERROR_OBJECT, t));
}
if (activeSpan instanceof MutableSpan) {
final MutableSpan localRootSpan = ((MutableSpan) activeSpan).getLocalRootSpan();
localRootSpan.setError(true);
localRootSpan.setTag(DDTags.ERROR_MSG, t.getMessage());
localRootSpan.setTag(DDTags.ERROR_TYPE, t.getClass().getName());
final StringWriter errorString = new StringWriter();
t.printStackTrace(new PrintWriter(errorString));
localRootSpan.setTag(DDTags.ERROR_STACK, errorString.toString());
}
}

/**
* Formats the tag key using {@link #TAG_FORMAT} provided by this utility, using the default tag
* prefix {@link #TAG_PREFIX}.
*
* @param tagKey The tag key to format.
* @return The formatted tag key.
*/
public static String formatTag(final String tagKey) {
return formatTag(tagKey, TAG_PREFIX);
}

/**
* Formats the tag key using {@link #TAG_FORMAT} provided by this utility with the provided tag
* prefix.
*
* @param tagKey The tag key to format.
* @param tagPrefix The prefix to be added to each custom tag name.
* @return The formatted tag key.
*/
public static String formatTag(final String tagKey, final String tagPrefix) {
return String.format(TAG_FORMAT, tagPrefix, tagKey);
}

}
1 change: 1 addition & 0 deletions airbyte-integrations/connectors/source-jdbc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies {
implementation project(':airbyte-integrations:connectors:source-relational-db')

implementation 'org.apache.commons:commons-lang3:3.11'
implementation libs.bundles.datadog

testImplementation project(':airbyte-test-utils')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import datadog.trace.api.Trace;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.map.MoreMaps;
Expand Down Expand Up @@ -114,6 +115,7 @@ protected AutoCloseableIterator<JsonNode> queryTableFullRefresh(final JdbcDataba
*
* @return list of consumers that run queries for the check command.
*/
@Trace(operationName = CHECK_TRACE_OPERATION_NAME)
protected List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final JsonNode config) throws Exception {
return ImmutableList.of(database -> {
LOGGER.info("Attempting to get metadata from the database to see if we can connect.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.4
LABEL io.airbyte.version=2.0.5
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.4
LABEL io.airbyte.version=2.0.5
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {

implementation 'org.apache.commons:commons-lang3:3.11'
implementation libs.postgresql
implementation libs.bundles.datadog

testImplementation testFixtures(project(':airbyte-integrations:bases:debezium'))
testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airbyte.commons.exceptions.ConfigErrorException;
import datadog.trace.api.Trace;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.functional.CheckedConsumer;
Expand Down Expand Up @@ -57,6 +58,7 @@
import io.airbyte.integrations.source.relationaldb.models.CdcState;
import io.airbyte.integrations.source.relationaldb.models.DbState;
import io.airbyte.integrations.source.relationaldb.state.StateManager;
import io.airbyte.integrations.util.ApmTraceUtils;
import io.airbyte.integrations.util.HostPortResolver;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
Expand Down Expand Up @@ -212,6 +214,7 @@ protected Set<String> getExcludedViews() {
}

@Override
@Trace(operationName = DISCOVER_TRACE_OPERATION_NAME)
public AirbyteCatalog discover(final JsonNode config) throws Exception {
final AirbyteCatalog catalog = super.discover(config);

Expand Down Expand Up @@ -281,6 +284,7 @@ List<JsonNode> getReplicationSlot(final JdbcDatabase database, final JsonNode co
}
}

@Trace(operationName = CHECK_TRACE_OPERATION_NAME)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same : do we need this annotation for every method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same case, by the way we can appropriately configure it after using and understanding what we really need to trace.

@Override
public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final JsonNode config)
throws Exception {
Expand Down Expand Up @@ -503,6 +507,7 @@ public static void main(final String[] args) throws Exception {
}

@Override
@Trace(operationName = CHECK_TRACE_OPERATION_NAME)
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
if (PostgresUtils.isCdc(config)) {
if (config.has(SSL_MODE) && config.get(SSL_MODE).has(MODE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies {
implementation project(':airbyte-config:config-models')

implementation 'org.apache.commons:commons-lang3:3.11'
implementation libs.bundles.datadog

testImplementation project(':airbyte-test-utils')

Expand Down
Loading