From 36549cda694a29c41378c72964cd0708835e1f4d Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Thu, 24 Feb 2022 21:51:59 +0800 Subject: [PATCH 1/8] Checkpoint: Get create job metric working. --- .../config/persistence/ConfigRepository.java | 5 +- airbyte-metrics/lib/build.gradle | 2 + .../metrics/lib/AirbyteMetricsRegistry.java | 4 ++ .../metrics/lib/DogStatsDMetricSingleton.java | 8 ++-- .../metrics/lib/MetricEmittingApps.java | 2 +- .../io/airbyte/metrics/lib/MetricTags.java | 14 ++++++ .../airbyte/metrics/lib/MetricsQueries.java | 47 +++++++++++++++++++ airbyte-workers/build.gradle | 1 + ...obCreationAndStatusUpdateActivityImpl.java | 22 ++++++++- ...obCreationAndStatusUpdateActivityTest.java | 14 +++++- 10 files changed, 110 insertions(+), 9 deletions(-) create mode 100644 airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java create mode 100644 airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsQueries.java diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index 5e3ef890edee5..718f379069071 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -77,6 +77,10 @@ public ConfigRepository(final ConfigPersistence persistence, this.database = new ExceptionWrappingDatabase(database); } + public ExceptionWrappingDatabase getDatabase() { + return database; + } + public StandardWorkspace getStandardWorkspace(final UUID workspaceId, final boolean includeTombstone) throws JsonValidationException, IOException, ConfigNotFoundException { final StandardWorkspace workspace = persistence.getConfig(ConfigSchema.STANDARD_WORKSPACE, workspaceId.toString(), StandardWorkspace.class); @@ -84,7 +88,6 @@ public StandardWorkspace getStandardWorkspace(final UUID workspaceId, final bool if (!MoreBooleans.isTruthy(workspace.getTombstone()) || includeTombstone) { return workspace; } - throw new ConfigNotFoundException(ConfigSchema.STANDARD_WORKSPACE, workspaceId.toString()); } diff --git a/airbyte-metrics/lib/build.gradle b/airbyte-metrics/lib/build.gradle index 1941dfd836fa6..2137bceeb87c4 100644 --- a/airbyte-metrics/lib/build.gradle +++ b/airbyte-metrics/lib/build.gradle @@ -5,6 +5,8 @@ plugins { dependencies { implementation project(':airbyte-commons') implementation project(':airbyte-config:models') + implementation project(':airbyte-db:jooq') + implementation project(':airbyte-db:lib') implementation 'com.datadoghq:java-dogstatsd-client:4.0.0' } diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/AirbyteMetricsRegistry.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/AirbyteMetricsRegistry.java index 515abfbca1af1..2d0ab53361fd9 100644 --- a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/AirbyteMetricsRegistry.java +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/AirbyteMetricsRegistry.java @@ -30,6 +30,10 @@ */ public enum AirbyteMetricsRegistry { + JOB_CREATED( + MetricEmittingApps.WORKER, + "job_created", + "increments each time a job is created"), KUBE_POD_PROCESS_CREATE_TIME_MILLISECS( MetricEmittingApps.WORKER, "kube_pod_process_create_time_millisecs", diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/DogStatsDMetricSingleton.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/DogStatsDMetricSingleton.java index e327309ab9c5d..fe5276a6f47b3 100644 --- a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/DogStatsDMetricSingleton.java +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/DogStatsDMetricSingleton.java @@ -70,7 +70,7 @@ public static void count(final AirbyteMetricsRegistry metric, final double amt, return; } - log.info("publishing count, name: {}, value: {}", metric.metricName, amt); + log.info("publishing count, name: {}, value: {}, tags: {}", metric.metricName, amt, tags); statsDClient.count(metric.metricName, amt, tags); } } @@ -90,7 +90,7 @@ public static void gauge(final AirbyteMetricsRegistry metric, final double val, return; } - log.info("publishing gauge, name: {}, value: {}", metric, val); + log.info("publishing gauge, name: {}, value: {}, tags: {}", metric, val, tags); statsDClient.gauge(metric.metricName, val, tags); } } @@ -117,7 +117,7 @@ public static void recordTimeLocal(final AirbyteMetricsRegistry metric, final do return; } - log.info("recording histogram, name: {}, value: {}", metric.metricName, val); + log.info("recording histogram, name: {}, value: {}, tags: {}", metric.metricName, val, tags); statsDClient.histogram(metric.metricName, val, tags); } } @@ -138,7 +138,7 @@ public static void recordTimeGlobal(final AirbyteMetricsRegistry metric, final d return; } - log.info("recording distribution, name: {}, value: {}", metric.metricName, val); + log.info("recording distribution, name: {}, value: {}, tags: {}", metric.metricName, val, tags); statsDClient.distribution(metric.metricName, val, tags); } } diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricEmittingApps.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricEmittingApps.java index 104af5835bd22..cde70fcf671ab 100644 --- a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricEmittingApps.java +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricEmittingApps.java @@ -8,7 +8,7 @@ /** * Enum containing all applications metrics are emitted for. Used to initialize - * {@link DogStatsDMetricSingleton#initialize(MetricEmittingApp, boolean)}. + * {@link DogStatsDMetricSingleton#initialize(MetricEmittingApp, DatadogClientConfiguration)}. * * Application Name Conventions: *

diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java new file mode 100644 index 0000000000000..f3d74e67a4084 --- /dev/null +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.metrics.lib; + +/** + * Keep track of all metric tags. + */ +public class MetricTags { + + public static final String RELEASE_STAGE = "release_stage:"; + +} diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsQueries.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsQueries.java new file mode 100644 index 0000000000000..b0f8414d7c4eb --- /dev/null +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsQueries.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.metrics.lib; + +import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR; +import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_DEFINITION; + +import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage; +import java.util.List; +import java.util.UUID; +import org.jooq.DSLContext; + +/** + * Keep track of all metric queries. + */ +public class MetricsQueries { + + public static final String JOB_ID_TO_RELEASE_STAGES = + new String(""" + with + joined_data as + ( select dest_def_data.release_stage as dest_def_id, + src_def_data.release_stage as src_def_id, + -- connection.id as conn_id, + row_number() over ( + partition by + dest_data.actor_definition_id, + src_data.actor_definition_id + ) as row_num + from connection + inner join jobs on connection.id=CAST(jobs.scope AS uuid) + inner join actor as dest_data on connection.destination_id = dest_data.id + inner join actor_definition as dest_def_data on dest_data.actor_definition_id = dest_def_data.id + inner join actor as src_data on connection.source_id = src_data.id + inner join actor_definition as src_def_data on src_data.actor_definition_id = src_def_data.id + where jobs.id = '') + select * from joined_data where joined_data.row_num = 1"""); + + public static List srcIdAndDestIdToReleaseStages(final DSLContext ctx, final UUID srcId, final UUID dstId) { + return ctx.select(ACTOR_DEFINITION.RELEASE_STAGE).from(ACTOR).join(ACTOR_DEFINITION).on(ACTOR.ACTOR_DEFINITION_ID.eq(ACTOR_DEFINITION.ID)) + .where(ACTOR.ID.eq(srcId)) + .or(ACTOR.ID.eq(dstId)).fetch().getValues(ACTOR_DEFINITION.RELEASE_STAGE); + } + +} diff --git a/airbyte-workers/build.gradle b/airbyte-workers/build.gradle index 9966c042b93e4..38aca08cad03f 100644 --- a/airbyte-workers/build.gradle +++ b/airbyte-workers/build.gradle @@ -24,6 +24,7 @@ dependencies { implementation project(':airbyte-commons-docker') implementation project(':airbyte-config:models') implementation project(':airbyte-config:persistence') + implementation project(':airbyte-db:jooq') implementation project(':airbyte-db:lib') implementation project(':airbyte-metrics:lib') implementation project(':airbyte-json-validation') diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index b64b8197c7174..4293c4aae1994 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -17,6 +17,11 @@ import io.airbyte.config.helpers.LogConfigs; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage; +import io.airbyte.metrics.lib.AirbyteMetricsRegistry; +import io.airbyte.metrics.lib.DogStatsDMetricSingleton; +import io.airbyte.metrics.lib.MetricTags; +import io.airbyte.metrics.lib.MetricsQueries; import io.airbyte.scheduler.models.Job; import io.airbyte.scheduler.persistence.JobCreator; import io.airbyte.scheduler.persistence.JobNotifier; @@ -53,8 +58,8 @@ public class JobCreationAndStatusUpdateActivityImpl implements JobCreationAndSta @Override public JobCreationOutput createNewJob(final JobCreationInput input) { try { + final StandardSync standardSync = configRepository.getStandardSync(input.getConnectionId()); if (input.isReset()) { - final StandardSync standardSync = configRepository.getStandardSync(input.getConnectionId()); final DestinationConnection destination = configRepository.getDestinationConnection(standardSync.getDestinationId()); @@ -80,6 +85,15 @@ public JobCreationOutput createNewJob(final JobCreationInput input) { final long jobId = jobFactory.create(input.getConnectionId()); log.info("New job created, with id: " + jobId); + final var srcId = standardSync.getSourceId(); + final var destId = standardSync.getDestinationId(); + + final var releaseStages = configRepository.getDatabase().query(ctx -> MetricsQueries.srcIdAndDestIdToReleaseStages(ctx, srcId, destId)); + if (releaseStages != null) { + for (final ReleaseStage stage : releaseStages) { + DogStatsDMetricSingleton.count(AirbyteMetricsRegistry.JOB_CREATED, 1, MetricTags.RELEASE_STAGE + stage.getLiteral()); + } + } return new JobCreationOutput(jobId); } @@ -157,7 +171,6 @@ public void jobCancelled(final JobCancelledInput input) { jobPersistence.cancelJob(input.getJobId()); jobPersistence.failAttempt(input.getJobId(), input.getAttemptId()); jobPersistence.writeAttemptFailureSummary(input.getJobId(), input.getAttemptId(), input.getAttemptFailureSummary()); - final Job job = jobPersistence.getJob(input.getJobId()); trackCompletion(job, JobStatus.FAILED); jobNotifier.failJob("Job was cancelled", job); @@ -180,4 +193,9 @@ private void trackCompletion(final Job job, final io.airbyte.workers.JobStatus s jobTracker.trackSync(job, Enums.convertTo(status, JobState.class)); } + // job -> connection -> source_id & destination_id -> actor_definition -> release stage + private void jobToReleaseStages() { + // config repository might be related to this. + } + } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java index 097aabd0947ed..5110f7261bc7b 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java @@ -9,17 +9,22 @@ import io.airbyte.config.FailureReason; import io.airbyte.config.FailureReason.FailureOrigin; import io.airbyte.config.JobOutput; +import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncSummary; import io.airbyte.config.StandardSyncSummary.ReplicationStatus; import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.config.helpers.LogConfigs; +import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.db.ExceptionWrappingDatabase; import io.airbyte.scheduler.models.Job; import io.airbyte.scheduler.persistence.JobNotifier; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory; import io.airbyte.scheduler.persistence.job_tracker.JobTracker; import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState; +import io.airbyte.validation.json.JsonValidationException; import io.airbyte.workers.temporal.exception.RetryableException; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationInput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationOutput; @@ -70,6 +75,9 @@ public class JobCreationAndStatusUpdateActivityTest { @Mock private JobTracker mJobtracker; + @Mock + private ConfigRepository mConfigRepository; + @InjectMocks private JobCreationAndStatusUpdateActivityImpl jobCreationAndStatusUpdateActivity; @@ -93,9 +101,13 @@ class Creation { @Test @DisplayName("Test job creation") - public void createJob() { + public void createJob() throws JsonValidationException, ConfigNotFoundException, IOException { Mockito.when(mJobFactory.create(CONNECTION_ID)) .thenReturn(JOB_ID); + Mockito.when(mConfigRepository.getStandardSync(CONNECTION_ID)) + .thenReturn(Mockito.mock(StandardSync.class)); + Mockito.when(mConfigRepository.getDatabase()) + .thenReturn(Mockito.mock(ExceptionWrappingDatabase.class)); final JobCreationOutput output = jobCreationAndStatusUpdateActivity.createNewJob(new JobCreationInput(CONNECTION_ID, false)); From c4102ef62a91f33503b66edc95fef8b835dcc0c9 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Thu, 24 Feb 2022 21:56:14 +0800 Subject: [PATCH 2/8] Add tags class. --- .../java/io/airbyte/metrics/lib/AirbyteMetricsRegistry.java | 6 +++--- .../activities/JobCreationAndStatusUpdateActivityImpl.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/AirbyteMetricsRegistry.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/AirbyteMetricsRegistry.java index 2d0ab53361fd9..7ed3cecc352cc 100644 --- a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/AirbyteMetricsRegistry.java +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/AirbyteMetricsRegistry.java @@ -30,10 +30,10 @@ */ public enum AirbyteMetricsRegistry { - JOB_CREATED( + JOB_CREATED_BY_RELEASE_STAGE( MetricEmittingApps.WORKER, - "job_created", - "increments each time a job is created"), + "job_created_by_release_stage", + "increments when a new job is created. jobs are double counted as this is tagged by release stage."), KUBE_POD_PROCESS_CREATE_TIME_MILLISECS( MetricEmittingApps.WORKER, "kube_pod_process_create_time_millisecs", diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index 4293c4aae1994..c7c56a6682f31 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -91,7 +91,7 @@ public JobCreationOutput createNewJob(final JobCreationInput input) { final var releaseStages = configRepository.getDatabase().query(ctx -> MetricsQueries.srcIdAndDestIdToReleaseStages(ctx, srcId, destId)); if (releaseStages != null) { for (final ReleaseStage stage : releaseStages) { - DogStatsDMetricSingleton.count(AirbyteMetricsRegistry.JOB_CREATED, 1, MetricTags.RELEASE_STAGE + stage.getLiteral()); + DogStatsDMetricSingleton.count(AirbyteMetricsRegistry.JOB_CREATED_BY_RELEASE_STAGE, 1, MetricTags.RELEASE_STAGE + stage.getLiteral()); } } From ed433fa6243d70101d6a67607459bfd64068e9a1 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Thu, 24 Feb 2022 23:16:05 +0800 Subject: [PATCH 3/8] Add tests. --- airbyte-metrics/lib/build.gradle | 3 + .../airbyte/metrics/lib/MetricsQueries.java | 38 +++---- .../metrics/llib/MetrisQueriesTest.java | 102 ++++++++++++++++++ 3 files changed, 124 insertions(+), 19 deletions(-) create mode 100644 airbyte-metrics/lib/src/test/java/io/airbyte/metrics/llib/MetrisQueriesTest.java diff --git a/airbyte-metrics/lib/build.gradle b/airbyte-metrics/lib/build.gradle index 2137bceeb87c4..fddad3b6e31ee 100644 --- a/airbyte-metrics/lib/build.gradle +++ b/airbyte-metrics/lib/build.gradle @@ -9,4 +9,7 @@ dependencies { implementation project(':airbyte-db:lib') implementation 'com.datadoghq:java-dogstatsd-client:4.0.0' + + testImplementation project(':airbyte-config:persistence') + testImplementation 'org.testcontainers:postgresql:1.15.3' } diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsQueries.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsQueries.java index b0f8414d7c4eb..949fa372f5abb 100644 --- a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsQueries.java +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsQueries.java @@ -18,25 +18,25 @@ public class MetricsQueries { public static final String JOB_ID_TO_RELEASE_STAGES = - new String(""" - with - joined_data as - ( select dest_def_data.release_stage as dest_def_id, - src_def_data.release_stage as src_def_id, - -- connection.id as conn_id, - row_number() over ( - partition by - dest_data.actor_definition_id, - src_data.actor_definition_id - ) as row_num - from connection - inner join jobs on connection.id=CAST(jobs.scope AS uuid) - inner join actor as dest_data on connection.destination_id = dest_data.id - inner join actor_definition as dest_def_data on dest_data.actor_definition_id = dest_def_data.id - inner join actor as src_data on connection.source_id = src_data.id - inner join actor_definition as src_def_data on src_data.actor_definition_id = src_def_data.id - where jobs.id = '') - select * from joined_data where joined_data.row_num = 1"""); + """ + with + joined_data as + ( select dest_def_data.release_stage as dest_def_id, + src_def_data.release_stage as src_def_id, + -- connection.id as conn_id, + row_number() over ( + partition by + dest_data.actor_definition_id, + src_data.actor_definition_id + ) as row_num + from connection + inner join jobs on connection.id=CAST(jobs.scope AS uuid) + inner join actor as dest_data on connection.destination_id = dest_data.id + inner join actor_definition as dest_def_data on dest_data.actor_definition_id = dest_def_data.id + inner join actor as src_data on connection.source_id = src_data.id + inner join actor_definition as src_def_data on src_data.actor_definition_id = src_def_data.id + where jobs.id = '') + select * from joined_data where joined_data.row_num = 1"""; public static List srcIdAndDestIdToReleaseStages(final DSLContext ctx, final UUID srcId, final UUID dstId) { return ctx.select(ACTOR_DEFINITION.RELEASE_STAGE).from(ACTOR).join(ACTOR_DEFINITION).on(ACTOR.ACTOR_DEFINITION_ID.eq(ACTOR_DEFINITION.ID)) diff --git a/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/llib/MetrisQueriesTest.java b/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/llib/MetrisQueriesTest.java new file mode 100644 index 0000000000000..317c5f6ff2961 --- /dev/null +++ b/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/llib/MetrisQueriesTest.java @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.metrics.llib; + +import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR; +import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_DEFINITION; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import io.airbyte.config.persistence.DatabaseConfigPersistence; +import io.airbyte.db.Database; +import io.airbyte.db.instance.configs.jooq.enums.ActorType; +import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage; +import io.airbyte.db.instance.test.TestDatabaseProviders; +import io.airbyte.metrics.lib.MetricsQueries; +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; +import java.util.UUID; +import org.jooq.JSONB; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.PostgreSQLContainer; + +public class MetrisQueriesTest { + + private static final String USER = "user"; + private static final String PASS = "hunter2"; + + private static PostgreSQLContainer container; + private static Database configDb; + private static DatabaseConfigPersistence configPersistence; + + @BeforeAll + static void setUpAll() throws IOException { + container = new PostgreSQLContainer<>("postgres:13-alpine") + .withUsername(USER) + .withPassword(PASS); + container.start(); + + final TestDatabaseProviders databaseProviders = new TestDatabaseProviders(container); + configDb = databaseProviders.createNewConfigsDatabase(); + new DatabaseConfigPersistence(configDb); + } + + @Nested + class srcIdAndDestIdToReleaseStages { + + @AfterEach + void tearDown() throws SQLException { + configDb.transaction(ctx -> ctx.truncate(ACTOR)); + configDb.transaction(ctx -> ctx.truncate(ACTOR_DEFINITION)); + } + + @Test + @DisplayName("should return the right release stages") + public void shouldReturnReleaseStages() throws SQLException { + final var srcDefId = UUID.randomUUID(); + final var dstDefId = UUID.randomUUID(); + final var srcId = UUID.randomUUID(); + final var dstId = UUID.randomUUID(); + + // create src and dst def + configDb.transaction(ctx -> ctx + .insertInto(ACTOR_DEFINITION, ACTOR_DEFINITION.ID, ACTOR_DEFINITION.NAME, ACTOR_DEFINITION.DOCKER_REPOSITORY, + ACTOR_DEFINITION.DOCKER_IMAGE_TAG, ACTOR_DEFINITION.SPEC, ACTOR_DEFINITION.ACTOR_TYPE, ACTOR_DEFINITION.RELEASE_STAGE) + .values(srcDefId, "srcDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.source, ReleaseStage.beta) + .values(dstDefId, "dstDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.destination, ReleaseStage.generally_available) + .values(UUID.randomUUID(), "dstDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.destination, ReleaseStage.alpha).execute()); + // drop foreign key to simplify set up + configDb.transaction(ctx -> ctx.alterTable(ACTOR).dropForeignKey("actor_workspace_id_fkey").execute()); + // create src and dst + configDb.transaction( + ctx -> ctx.insertInto(ACTOR, ACTOR.ID, ACTOR.WORKSPACE_ID, ACTOR.ACTOR_DEFINITION_ID, ACTOR.NAME, ACTOR.CONFIGURATION, ACTOR.ACTOR_TYPE) + .values(srcId, UUID.randomUUID(), srcDefId, "src", JSONB.valueOf("{}"), ActorType.source) + .values(dstId, UUID.randomUUID(), dstDefId, "dst", JSONB.valueOf("{}"), ActorType.destination) + .execute()); + final var res = configDb.query(ctx -> MetricsQueries.srcIdAndDestIdToReleaseStages(ctx, srcId, dstId)); + assertEquals(List.of(ReleaseStage.beta, ReleaseStage.generally_available), res); + } + + @Test + @DisplayName("should not error out or return any result if not applicable") + public void shouldReturnNothingIfNotApplicable() throws SQLException { + configDb.transaction(ctx -> ctx + .insertInto(ACTOR_DEFINITION, ACTOR_DEFINITION.ID, ACTOR_DEFINITION.NAME, ACTOR_DEFINITION.DOCKER_REPOSITORY, + ACTOR_DEFINITION.DOCKER_IMAGE_TAG, ACTOR_DEFINITION.SPEC, ACTOR_DEFINITION.ACTOR_TYPE, ACTOR_DEFINITION.RELEASE_STAGE) + .values(UUID.randomUUID(), "srcDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.source, ReleaseStage.beta) + .values(UUID.randomUUID(), "dstDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.destination, ReleaseStage.generally_available) + .values(UUID.randomUUID(), "dstDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.destination, ReleaseStage.alpha).execute()); + + final var res = configDb.query(ctx -> MetricsQueries.srcIdAndDestIdToReleaseStages(ctx, UUID.randomUUID(), UUID.randomUUID())); + assertEquals(0, res.size()); + } + + } + +} From 801ab3f3dfe3f042e5d9825e3bf8f4dbdee3feae Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Fri, 25 Feb 2022 02:36:15 +0800 Subject: [PATCH 4/8] Add tests for jobIdToReleaseStages function and simplify SQL. --- .../io/airbyte/metrics/lib/MetricQueries.java | 47 ++++++++ .../airbyte/metrics/lib/MetricsQueries.java | 47 -------- .../metrics/llib/MetrisQueriesTest.java | 105 ++++++++++++------ ...obCreationAndStatusUpdateActivityImpl.java | 30 ++--- 4 files changed, 133 insertions(+), 96 deletions(-) create mode 100644 airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricQueries.java delete mode 100644 airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsQueries.java diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricQueries.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricQueries.java new file mode 100644 index 0000000000000..711c2f90ac796 --- /dev/null +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricQueries.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.metrics.lib; + +import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR; +import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_DEFINITION; + +import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage; +import java.util.List; +import java.util.UUID; +import org.jooq.DSLContext; + +/** + * Keep track of all metric queries. + */ +public class MetricQueries { + + public static List jobIdToReleaseStages(final DSLContext ctx, final long jobId) { + final var srcRelStageCol = "src_release_stage"; + final var dstRelStageCol = "dst_release_stage"; + + final var query = String.format(""" + select src_def_data.release_stage as %s, + dest_def_data.release_stage as %s + from connection + inner join jobs on connection.id=CAST(jobs.scope AS uuid) + inner join actor as dest_data on connection.destination_id = dest_data.id + inner join actor_definition as dest_def_data on dest_data.actor_definition_id = dest_def_data.id + inner join actor as src_data on connection.source_id = src_data.id + inner join actor_definition as src_def_data on src_data.actor_definition_id = src_def_data.id + where jobs.id = '%d';""", srcRelStageCol, dstRelStageCol, jobId); + + final var res = ctx.fetch(query); + final var stages = res.getValues(srcRelStageCol, ReleaseStage.class); + stages.addAll(res.getValues(dstRelStageCol, ReleaseStage.class)); + return stages; + } + + public static List srcIdAndDestIdToReleaseStages(final DSLContext ctx, final UUID srcId, final UUID dstId) { + return ctx.select(ACTOR_DEFINITION.RELEASE_STAGE).from(ACTOR).join(ACTOR_DEFINITION).on(ACTOR.ACTOR_DEFINITION_ID.eq(ACTOR_DEFINITION.ID)) + .where(ACTOR.ID.eq(srcId)) + .or(ACTOR.ID.eq(dstId)).fetch().getValues(ACTOR_DEFINITION.RELEASE_STAGE); + } + +} diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsQueries.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsQueries.java deleted file mode 100644 index 949fa372f5abb..0000000000000 --- a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsQueries.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.metrics.lib; - -import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR; -import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_DEFINITION; - -import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage; -import java.util.List; -import java.util.UUID; -import org.jooq.DSLContext; - -/** - * Keep track of all metric queries. - */ -public class MetricsQueries { - - public static final String JOB_ID_TO_RELEASE_STAGES = - """ - with - joined_data as - ( select dest_def_data.release_stage as dest_def_id, - src_def_data.release_stage as src_def_id, - -- connection.id as conn_id, - row_number() over ( - partition by - dest_data.actor_definition_id, - src_data.actor_definition_id - ) as row_num - from connection - inner join jobs on connection.id=CAST(jobs.scope AS uuid) - inner join actor as dest_data on connection.destination_id = dest_data.id - inner join actor_definition as dest_def_data on dest_data.actor_definition_id = dest_def_data.id - inner join actor as src_data on connection.source_id = src_data.id - inner join actor_definition as src_def_data on src_data.actor_definition_id = src_def_data.id - where jobs.id = '') - select * from joined_data where joined_data.row_num = 1"""; - - public static List srcIdAndDestIdToReleaseStages(final DSLContext ctx, final UUID srcId, final UUID dstId) { - return ctx.select(ACTOR_DEFINITION.RELEASE_STAGE).from(ACTOR).join(ACTOR_DEFINITION).on(ACTOR.ACTOR_DEFINITION_ID.eq(ACTOR_DEFINITION.ID)) - .where(ACTOR.ID.eq(srcId)) - .or(ACTOR.ID.eq(dstId)).fetch().getValues(ACTOR_DEFINITION.RELEASE_STAGE); - } - -} diff --git a/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/llib/MetrisQueriesTest.java b/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/llib/MetrisQueriesTest.java index 317c5f6ff2961..8579e6b336664 100644 --- a/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/llib/MetrisQueriesTest.java +++ b/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/llib/MetrisQueriesTest.java @@ -6,14 +6,16 @@ import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR; import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_DEFINITION; +import static io.airbyte.db.instance.configs.jooq.Tables.CONNECTION; +import static io.airbyte.db.instance.jobs.jooq.Tables.*; import static org.junit.jupiter.api.Assertions.assertEquals; -import io.airbyte.config.persistence.DatabaseConfigPersistence; import io.airbyte.db.Database; import io.airbyte.db.instance.configs.jooq.enums.ActorType; +import io.airbyte.db.instance.configs.jooq.enums.NamespaceDefinitionType; import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage; import io.airbyte.db.instance.test.TestDatabaseProviders; -import io.airbyte.metrics.lib.MetricsQueries; +import io.airbyte.metrics.lib.MetricQueries; import java.io.IOException; import java.sql.SQLException; import java.util.List; @@ -31,12 +33,14 @@ public class MetrisQueriesTest { private static final String USER = "user"; private static final String PASS = "hunter2"; + private static final UUID SRC_DEF_ID = UUID.randomUUID(); + private static final UUID DST_DEF_ID = UUID.randomUUID(); + private static PostgreSQLContainer container; private static Database configDb; - private static DatabaseConfigPersistence configPersistence; @BeforeAll - static void setUpAll() throws IOException { + static void setUpAll() throws IOException, SQLException { container = new PostgreSQLContainer<>("postgres:13-alpine") .withUsername(USER) .withPassword(PASS); @@ -44,56 +48,89 @@ static void setUpAll() throws IOException { final TestDatabaseProviders databaseProviders = new TestDatabaseProviders(container); configDb = databaseProviders.createNewConfigsDatabase(); - new DatabaseConfigPersistence(configDb); + databaseProviders.createNewJobsDatabase(); + + // create src and dst def + configDb.transaction(ctx -> ctx + .insertInto(ACTOR_DEFINITION, ACTOR_DEFINITION.ID, ACTOR_DEFINITION.NAME, ACTOR_DEFINITION.DOCKER_REPOSITORY, + ACTOR_DEFINITION.DOCKER_IMAGE_TAG, ACTOR_DEFINITION.SPEC, ACTOR_DEFINITION.ACTOR_TYPE, ACTOR_DEFINITION.RELEASE_STAGE) + .values(SRC_DEF_ID, "srcDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.source, ReleaseStage.beta) + .values(DST_DEF_ID, "dstDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.destination, ReleaseStage.generally_available) + .values(UUID.randomUUID(), "dstDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.destination, ReleaseStage.alpha).execute()); + + // drop the constraint to simplify following test set up + configDb.transaction(ctx -> ctx.alterTable(ACTOR).dropForeignKey("actor_workspace_id_fkey").execute()); + } + + @AfterEach + void tearDown() throws SQLException { + configDb.transaction(ctx -> ctx.truncate(ACTOR)); } @Nested class srcIdAndDestIdToReleaseStages { - @AfterEach - void tearDown() throws SQLException { - configDb.transaction(ctx -> ctx.truncate(ACTOR)); - configDb.transaction(ctx -> ctx.truncate(ACTOR_DEFINITION)); + @Test + @DisplayName("should return the right release stages") + void shouldReturnReleaseStages() throws SQLException { + final var srcId = UUID.randomUUID(); + final var dstId = UUID.randomUUID(); + + // create src and dst + configDb.transaction( + ctx -> ctx.insertInto(ACTOR, ACTOR.ID, ACTOR.WORKSPACE_ID, ACTOR.ACTOR_DEFINITION_ID, ACTOR.NAME, ACTOR.CONFIGURATION, ACTOR.ACTOR_TYPE) + .values(srcId, UUID.randomUUID(), SRC_DEF_ID, "src", JSONB.valueOf("{}"), ActorType.source) + .values(dstId, UUID.randomUUID(), DST_DEF_ID, "dst", JSONB.valueOf("{}"), ActorType.destination) + .execute()); + final var res = configDb.query(ctx -> MetricQueries.srcIdAndDestIdToReleaseStages(ctx, srcId, dstId)); + assertEquals(List.of(ReleaseStage.beta, ReleaseStage.generally_available), res); } + @Test + @DisplayName("should not error out or return any result if not applicable") + void shouldReturnNothingIfNotApplicable() throws SQLException { + final var res = configDb.query(ctx -> MetricQueries.srcIdAndDestIdToReleaseStages(ctx, UUID.randomUUID(), UUID.randomUUID())); + assertEquals(0, res.size()); + } + + } + + @Nested + class jobIdToReleaseStages { + @Test @DisplayName("should return the right release stages") - public void shouldReturnReleaseStages() throws SQLException { - final var srcDefId = UUID.randomUUID(); - final var dstDefId = UUID.randomUUID(); + void shouldReturnReleaseStages() throws SQLException { final var srcId = UUID.randomUUID(); final var dstId = UUID.randomUUID(); - - // create src and dst def - configDb.transaction(ctx -> ctx - .insertInto(ACTOR_DEFINITION, ACTOR_DEFINITION.ID, ACTOR_DEFINITION.NAME, ACTOR_DEFINITION.DOCKER_REPOSITORY, - ACTOR_DEFINITION.DOCKER_IMAGE_TAG, ACTOR_DEFINITION.SPEC, ACTOR_DEFINITION.ACTOR_TYPE, ACTOR_DEFINITION.RELEASE_STAGE) - .values(srcDefId, "srcDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.source, ReleaseStage.beta) - .values(dstDefId, "dstDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.destination, ReleaseStage.generally_available) - .values(UUID.randomUUID(), "dstDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.destination, ReleaseStage.alpha).execute()); - // drop foreign key to simplify set up - configDb.transaction(ctx -> ctx.alterTable(ACTOR).dropForeignKey("actor_workspace_id_fkey").execute()); // create src and dst configDb.transaction( ctx -> ctx.insertInto(ACTOR, ACTOR.ID, ACTOR.WORKSPACE_ID, ACTOR.ACTOR_DEFINITION_ID, ACTOR.NAME, ACTOR.CONFIGURATION, ACTOR.ACTOR_TYPE) - .values(srcId, UUID.randomUUID(), srcDefId, "src", JSONB.valueOf("{}"), ActorType.source) - .values(dstId, UUID.randomUUID(), dstDefId, "dst", JSONB.valueOf("{}"), ActorType.destination) + .values(srcId, UUID.randomUUID(), SRC_DEF_ID, "src", JSONB.valueOf("{}"), ActorType.source) + .values(dstId, UUID.randomUUID(), DST_DEF_ID, "dst", JSONB.valueOf("{}"), ActorType.destination) + .execute()); + final var connId = UUID.randomUUID(); + // create connection + configDb.transaction( + ctx -> ctx + .insertInto(CONNECTION, CONNECTION.ID, CONNECTION.NAMESPACE_DEFINITION, CONNECTION.SOURCE_ID, CONNECTION.DESTINATION_ID, + CONNECTION.NAME, CONNECTION.CATALOG, CONNECTION.MANUAL) + .values(connId, NamespaceDefinitionType.source, srcId, dstId, "conn", JSONB.valueOf("{}"), true) .execute()); - final var res = configDb.query(ctx -> MetricsQueries.srcIdAndDestIdToReleaseStages(ctx, srcId, dstId)); + // create job + final var jobId = 1L; + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE).values(jobId, connId.toString()).execute()); + + final var res = configDb.query(ctx -> MetricQueries.jobIdToReleaseStages(ctx, jobId)); assertEquals(List.of(ReleaseStage.beta, ReleaseStage.generally_available), res); } @Test @DisplayName("should not error out or return any result if not applicable") - public void shouldReturnNothingIfNotApplicable() throws SQLException { - configDb.transaction(ctx -> ctx - .insertInto(ACTOR_DEFINITION, ACTOR_DEFINITION.ID, ACTOR_DEFINITION.NAME, ACTOR_DEFINITION.DOCKER_REPOSITORY, - ACTOR_DEFINITION.DOCKER_IMAGE_TAG, ACTOR_DEFINITION.SPEC, ACTOR_DEFINITION.ACTOR_TYPE, ACTOR_DEFINITION.RELEASE_STAGE) - .values(UUID.randomUUID(), "srcDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.source, ReleaseStage.beta) - .values(UUID.randomUUID(), "dstDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.destination, ReleaseStage.generally_available) - .values(UUID.randomUUID(), "dstDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.destination, ReleaseStage.alpha).execute()); - - final var res = configDb.query(ctx -> MetricsQueries.srcIdAndDestIdToReleaseStages(ctx, UUID.randomUUID(), UUID.randomUUID())); + void shouldReturnNothingIfNotApplicable() throws SQLException { + final var missingJobId = 100000L; + final var res = configDb.query(ctx -> MetricQueries.jobIdToReleaseStages(ctx, missingJobId)); assertEquals(0, res.size()); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index c7c56a6682f31..d05d618c04490 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -20,8 +20,8 @@ import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage; import io.airbyte.metrics.lib.AirbyteMetricsRegistry; import io.airbyte.metrics.lib.DogStatsDMetricSingleton; +import io.airbyte.metrics.lib.MetricQueries; import io.airbyte.metrics.lib.MetricTags; -import io.airbyte.metrics.lib.MetricsQueries; import io.airbyte.scheduler.models.Job; import io.airbyte.scheduler.persistence.JobCreator; import io.airbyte.scheduler.persistence.JobNotifier; @@ -38,6 +38,7 @@ import java.nio.file.Path; import java.util.List; import java.util.Optional; +import java.util.UUID; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -85,15 +86,8 @@ public JobCreationOutput createNewJob(final JobCreationInput input) { final long jobId = jobFactory.create(input.getConnectionId()); log.info("New job created, with id: " + jobId); - final var srcId = standardSync.getSourceId(); - final var destId = standardSync.getDestinationId(); - - final var releaseStages = configRepository.getDatabase().query(ctx -> MetricsQueries.srcIdAndDestIdToReleaseStages(ctx, srcId, destId)); - if (releaseStages != null) { - for (final ReleaseStage stage : releaseStages) { - DogStatsDMetricSingleton.count(AirbyteMetricsRegistry.JOB_CREATED_BY_RELEASE_STAGE, 1, MetricTags.RELEASE_STAGE + stage.getLiteral()); - } - } + emitJobToReleaseStagesMetric(AirbyteMetricsRegistry.JOB_CREATED_BY_RELEASE_STAGE, standardSync.getSourceId(), + standardSync.getDestinationId()); return new JobCreationOutput(jobId); } @@ -102,6 +96,17 @@ public JobCreationOutput createNewJob(final JobCreationInput input) { } } + private void emitJobToReleaseStagesMetric(final AirbyteMetricsRegistry metric, final UUID srcId, final UUID dstId) throws IOException { + final var releaseStages = configRepository.getDatabase().query(ctx -> MetricQueries.srcIdAndDestIdToReleaseStages(ctx, srcId, dstId)); + if (releaseStages == null) { + return; + } + + for (final ReleaseStage stage : releaseStages) { + DogStatsDMetricSingleton.count(metric, 1, MetricTags.RELEASE_STAGE + stage.getLiteral()); + } + } + @Override public AttemptCreationOutput createNewAttempt(final AttemptCreationInput input) throws RetryableException { try { @@ -193,9 +198,4 @@ private void trackCompletion(final Job job, final io.airbyte.workers.JobStatus s jobTracker.trackSync(job, Enums.convertTo(status, JobState.class)); } - // job -> connection -> source_id & destination_id -> actor_definition -> release stage - private void jobToReleaseStages() { - // config repository might be related to this. - } - } From 80e1a00731bd8476a089db5a7566ce386a8063a4 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Fri, 25 Feb 2022 02:48:43 +0800 Subject: [PATCH 5/8] Inject all metrics. --- .../metrics/lib/AirbyteMetricsRegistry.java | 12 +++++ ...obCreationAndStatusUpdateActivityImpl.java | 50 +++++++++++++------ 2 files changed, 48 insertions(+), 14 deletions(-) diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/AirbyteMetricsRegistry.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/AirbyteMetricsRegistry.java index 7ed3cecc352cc..9682d7874b4e3 100644 --- a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/AirbyteMetricsRegistry.java +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/AirbyteMetricsRegistry.java @@ -30,10 +30,22 @@ */ public enum AirbyteMetricsRegistry { + JOB_CANCELLED_BY_RELEASE_STAGE( + MetricEmittingApps.WORKER, + "job_cancelled_by_release_stage", + "increments when a job is cancelled. jobs are double counted as this is tagged by release stage."), JOB_CREATED_BY_RELEASE_STAGE( MetricEmittingApps.WORKER, "job_created_by_release_stage", "increments when a new job is created. jobs are double counted as this is tagged by release stage."), + JOB_FAILED_BY_RELEASE_STAGE( + MetricEmittingApps.WORKER, + "job_failed_by_release_stage", + "increments when a job fails. jobs are double counted as this is tagged by release stage."), + JOB_SUCCEEDED_BY_RELEASE_STAGE( + MetricEmittingApps.WORKER, + "job_succeeded_by_release_stage", + "increments when a job succeeds. jobs are double counted as this is tagged by release stage."), KUBE_POD_PROCESS_CREATE_TIME_MILLISECS( MetricEmittingApps.WORKER, "kube_pod_process_create_time_millisecs", diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index d05d618c04490..6d4c2defb34d9 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -86,8 +86,7 @@ public JobCreationOutput createNewJob(final JobCreationInput input) { final long jobId = jobFactory.create(input.getConnectionId()); log.info("New job created, with id: " + jobId); - emitJobToReleaseStagesMetric(AirbyteMetricsRegistry.JOB_CREATED_BY_RELEASE_STAGE, standardSync.getSourceId(), - standardSync.getDestinationId()); + emitSrcIdDstIdToReleaseStagesMetric(standardSync.getSourceId(), standardSync.getDestinationId()); return new JobCreationOutput(jobId); } @@ -96,14 +95,14 @@ public JobCreationOutput createNewJob(final JobCreationInput input) { } } - private void emitJobToReleaseStagesMetric(final AirbyteMetricsRegistry metric, final UUID srcId, final UUID dstId) throws IOException { + private void emitSrcIdDstIdToReleaseStagesMetric(final UUID srcId, final UUID dstId) throws IOException { final var releaseStages = configRepository.getDatabase().query(ctx -> MetricQueries.srcIdAndDestIdToReleaseStages(ctx, srcId, dstId)); if (releaseStages == null) { return; } for (final ReleaseStage stage : releaseStages) { - DogStatsDMetricSingleton.count(metric, 1, MetricTags.RELEASE_STAGE + stage.getLiteral()); + DogStatsDMetricSingleton.count(AirbyteMetricsRegistry.JOB_CREATED_BY_RELEASE_STAGE, 1, MetricTags.RELEASE_STAGE + stage.getLiteral()); } } @@ -127,15 +126,20 @@ public AttemptCreationOutput createNewAttempt(final AttemptCreationInput input) @Override public void jobSuccess(final JobSuccessInput input) { try { + final long jobId = input.getJobId(); + final int attemptId = input.getAttemptId(); + if (input.getStandardSyncOutput() != null) { final JobOutput jobOutput = new JobOutput().withSync(input.getStandardSyncOutput()); - jobPersistence.writeOutput(input.getJobId(), input.getAttemptId(), jobOutput); + jobPersistence.writeOutput(jobId, attemptId, jobOutput); } else { - log.warn("The job {} doesn't have any output for the attempt {}", input.getJobId(), input.getAttemptId()); + log.warn("The job {} doesn't have any output for the attempt {}", jobId, attemptId); } - jobPersistence.succeedAttempt(input.getJobId(), input.getAttemptId()); - final Job job = jobPersistence.getJob(input.getJobId()); + jobPersistence.succeedAttempt(jobId, attemptId); + final Job job = jobPersistence.getJob(jobId); + jobNotifier.successJob(job); + emitJobIdToReleaseStagesMetric(AirbyteMetricsRegistry.JOB_SUCCEEDED_BY_RELEASE_STAGE, jobId); trackCompletion(job, JobStatus.SUCCEEDED); } catch (final IOException e) { throw new RetryableException(e); @@ -145,9 +149,12 @@ public void jobSuccess(final JobSuccessInput input) { @Override public void jobFailure(final JobFailureInput input) { try { - jobPersistence.failJob(input.getJobId()); - final Job job = jobPersistence.getJob(input.getJobId()); + final var jobId = input.getJobId(); + jobPersistence.failJob(jobId); + final Job job = jobPersistence.getJob(jobId); + jobNotifier.failJob(input.getReason(), job); + emitJobIdToReleaseStagesMetric(AirbyteMetricsRegistry.JOB_FAILED_BY_RELEASE_STAGE, jobId); trackCompletion(job, JobStatus.FAILED); } catch (final IOException e) { throw new RetryableException(e); @@ -173,11 +180,15 @@ public void attemptFailure(final AttemptFailureInput input) { @Override public void jobCancelled(final JobCancelledInput input) { try { - jobPersistence.cancelJob(input.getJobId()); - jobPersistence.failAttempt(input.getJobId(), input.getAttemptId()); - jobPersistence.writeAttemptFailureSummary(input.getJobId(), input.getAttemptId(), input.getAttemptFailureSummary()); - final Job job = jobPersistence.getJob(input.getJobId()); + final long jobId = input.getJobId(); + jobPersistence.cancelJob(jobId); + final int attemptId = input.getAttemptId(); + jobPersistence.failAttempt(jobId, attemptId); + jobPersistence.writeAttemptFailureSummary(jobId, attemptId, input.getAttemptFailureSummary()); + + final Job job = jobPersistence.getJob(jobId); trackCompletion(job, JobStatus.FAILED); + emitJobIdToReleaseStagesMetric(AirbyteMetricsRegistry.JOB_SUCCEEDED_BY_RELEASE_STAGE, jobId); jobNotifier.failJob("Job was cancelled", job); } catch (final IOException e) { throw new RetryableException(e); @@ -194,6 +205,17 @@ public void reportJobStart(final ReportJobStartInput input) { } } + private void emitJobIdToReleaseStagesMetric(final AirbyteMetricsRegistry metric, final long jobId) throws IOException { + final var releaseStages = configRepository.getDatabase().query(ctx -> MetricQueries.jobIdToReleaseStages(ctx, jobId)); + if (releaseStages == null) { + return; + } + + for (final ReleaseStage stage : releaseStages) { + DogStatsDMetricSingleton.count(metric, 1, MetricTags.RELEASE_STAGE + stage.getLiteral()); + } + } + private void trackCompletion(final Job job, final io.airbyte.workers.JobStatus status) { jobTracker.trackSync(job, Enums.convertTo(status, JobState.class)); } From 87322044fdbc37e77f732b21e022e98f96d20bc4 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Fri, 25 Feb 2022 14:38:16 +0800 Subject: [PATCH 6/8] Respond to PR feedback. --- .../metrics/lib/DogStatsDMetricSingleton.java | 14 +++++++------- .../airbyte/metrics/lib/MetricEmittingApp.java | 2 +- .../io/airbyte/metrics/lib/MetricQueries.java | 18 +++++++++--------- ...tricsRegistry.java => MetricsRegistry.java} | 4 ++-- .../DogStatsDMetricSingletonTest.java | 12 ++++-------- .../{llib => lib}/MetrisQueriesTest.java | 3 +-- .../workers/process/KubePodProcess.java | 4 ++-- ...JobCreationAndStatusUpdateActivityImpl.java | 12 ++++++------ ...JobCreationAndStatusUpdateActivityTest.java | 6 ++++++ 9 files changed, 38 insertions(+), 37 deletions(-) rename airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/{AirbyteMetricsRegistry.java => MetricsRegistry.java} (94%) rename airbyte-metrics/lib/src/test/java/io/airbyte/metrics/{llib => lib}/DogStatsDMetricSingletonTest.java (68%) rename airbyte-metrics/lib/src/test/java/io/airbyte/metrics/{llib => lib}/MetrisQueriesTest.java (98%) diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/DogStatsDMetricSingleton.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/DogStatsDMetricSingleton.java index fe5276a6f47b3..5cfec14e606bd 100644 --- a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/DogStatsDMetricSingleton.java +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/DogStatsDMetricSingleton.java @@ -62,7 +62,7 @@ public synchronized static void flush() { * @param amt to adjust. * @param tags */ - public static void count(final AirbyteMetricsRegistry metric, final double amt, final String... tags) { + public static void count(final MetricsRegistry metric, final double amt, final String... tags) { if (instancePublish) { if (statsDClient == null) { // do not loudly fail to prevent application disruption @@ -82,7 +82,7 @@ public static void count(final AirbyteMetricsRegistry metric, final double amt, * @param val to record. * @param tags */ - public static void gauge(final AirbyteMetricsRegistry metric, final double val, final String... tags) { + public static void gauge(final MetricsRegistry metric, final double val, final String... tags) { if (instancePublish) { if (statsDClient == null) { // do not loudly fail to prevent application disruption @@ -109,7 +109,7 @@ public static void gauge(final AirbyteMetricsRegistry metric, final double val, * @param val of time to record. * @param tags */ - public static void recordTimeLocal(final AirbyteMetricsRegistry metric, final double val, final String... tags) { + public static void recordTimeLocal(final MetricsRegistry metric, final double val, final String... tags) { if (instancePublish) { if (statsDClient == null) { // do not loudly fail to prevent application disruption @@ -130,7 +130,7 @@ public static void recordTimeLocal(final AirbyteMetricsRegistry metric, final do * @param val of time to record. * @param tags */ - public static void recordTimeGlobal(final AirbyteMetricsRegistry metric, final double val, final String... tags) { + public static void recordTimeGlobal(final MetricsRegistry metric, final double val, final String... tags) { if (instancePublish) { if (statsDClient == null) { // do not loudly fail to prevent application disruption @@ -144,14 +144,14 @@ public static void recordTimeGlobal(final AirbyteMetricsRegistry metric, final d } /** - * Wrapper of {@link #recordTimeGlobal(AirbyteMetricsRegistry, double, String...)} with a runnable - * for convenience. + * Wrapper of {@link #recordTimeGlobal(MetricsRegistry, double, String...)} with a runnable for + * convenience. * * @param metric * @param runnable to time * @param tags */ - public static void recordTimeGlobal(final AirbyteMetricsRegistry metric, final Runnable runnable, final String... tags) { + public static void recordTimeGlobal(final MetricsRegistry metric, final Runnable runnable, final String... tags) { final long start = System.currentTimeMillis(); runnable.run(); final long end = System.currentTimeMillis(); diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricEmittingApp.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricEmittingApp.java index c47700d23b170..5c538bb55e8e3 100644 --- a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricEmittingApp.java +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricEmittingApp.java @@ -7,7 +7,7 @@ /** * Interface representing an Airbyte Application to collect metrics for. This interface is present * as Java doesn't support enum inheritance as of Java 17. We use a shared interface so this - * interface can be used in the {@link AirbyteMetricsRegistry} enum. + * interface can be used in the {@link MetricsRegistry} enum. */ public interface MetricEmittingApp { diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricQueries.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricQueries.java index 711c2f90ac796..21e83cec27287 100644 --- a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricQueries.java +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricQueries.java @@ -22,15 +22,15 @@ public static List jobIdToReleaseStages(final DSLContext ctx, fina final var dstRelStageCol = "dst_release_stage"; final var query = String.format(""" - select src_def_data.release_stage as %s, - dest_def_data.release_stage as %s - from connection - inner join jobs on connection.id=CAST(jobs.scope AS uuid) - inner join actor as dest_data on connection.destination_id = dest_data.id - inner join actor_definition as dest_def_data on dest_data.actor_definition_id = dest_def_data.id - inner join actor as src_data on connection.source_id = src_data.id - inner join actor_definition as src_def_data on src_data.actor_definition_id = src_def_data.id - where jobs.id = '%d';""", srcRelStageCol, dstRelStageCol, jobId); + SELECT src_def_data.release_stage AS %s, + dest_def_data.release_stage AS %s + FROM connection + INNER JOIN jobs ON connection.id=CAST(jobs.scope AS uuid) + INNER JOIN actor AS dest_data ON connection.destination_id = dest_data.id + INNER JOIN actor_definition AS dest_def_data ON dest_data.actor_definition_id = dest_def_data.id + INNER JOIN actor AS src_data ON connection.source_id = src_data.id + INNER JOIN actor_definition AS src_def_data ON src_data.actor_definition_id = src_def_data.id + WHERE jobs.id = '%d';""", srcRelStageCol, dstRelStageCol, jobId); final var res = ctx.fetch(query); final var stages = res.getValues(srcRelStageCol, ReleaseStage.class); diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/AirbyteMetricsRegistry.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsRegistry.java similarity index 94% rename from airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/AirbyteMetricsRegistry.java rename to airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsRegistry.java index 9682d7874b4e3..e80b27845618b 100644 --- a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/AirbyteMetricsRegistry.java +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsRegistry.java @@ -28,7 +28,7 @@ * - Add units at name end if applicable. This is especially relevant for time units. versioning * tactic and present at the end of the metric. */ -public enum AirbyteMetricsRegistry { +public enum MetricsRegistry { JOB_CANCELLED_BY_RELEASE_STAGE( MetricEmittingApps.WORKER, @@ -55,7 +55,7 @@ public enum AirbyteMetricsRegistry { public final String metricName; public final String metricDescription; - AirbyteMetricsRegistry(final MetricEmittingApp application, final String metricName, final String metricDescription) { + MetricsRegistry(final MetricEmittingApp application, final String metricName, final String metricDescription) { Preconditions.checkNotNull(metricDescription); Preconditions.checkNotNull(application); diff --git a/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/llib/DogStatsDMetricSingletonTest.java b/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/lib/DogStatsDMetricSingletonTest.java similarity index 68% rename from airbyte-metrics/lib/src/test/java/io/airbyte/metrics/llib/DogStatsDMetricSingletonTest.java rename to airbyte-metrics/lib/src/test/java/io/airbyte/metrics/lib/DogStatsDMetricSingletonTest.java index 00fad98f848bc..dc6a929bea2de 100644 --- a/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/llib/DogStatsDMetricSingletonTest.java +++ b/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/lib/DogStatsDMetricSingletonTest.java @@ -2,12 +2,8 @@ * Copyright (c) 2021 Airbyte, Inc., all rights reserved. */ -package io.airbyte.metrics.llib; +package io.airbyte.metrics.lib; -import io.airbyte.metrics.lib.AirbyteMetricsRegistry; -import io.airbyte.metrics.lib.DatadogClientConfiguration; -import io.airbyte.metrics.lib.DogStatsDMetricSingleton; -import io.airbyte.metrics.lib.MetricEmittingApps; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.DisplayName; @@ -25,7 +21,7 @@ void tearDown() { public void testPublishTrueNoEmitError() { Assertions.assertDoesNotThrow(() -> { DogStatsDMetricSingleton.initialize(MetricEmittingApps.WORKER, new DatadogClientConfiguration("localhost", "1000", false)); - DogStatsDMetricSingleton.gauge(AirbyteMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1); + DogStatsDMetricSingleton.gauge(MetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1); }); } @@ -34,7 +30,7 @@ public void testPublishTrueNoEmitError() { public void testPublishFalseNoEmitError() { Assertions.assertDoesNotThrow(() -> { DogStatsDMetricSingleton.initialize(MetricEmittingApps.WORKER, new DatadogClientConfiguration("localhost", "1000", true)); - DogStatsDMetricSingleton.gauge(AirbyteMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1); + DogStatsDMetricSingleton.gauge(MetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1); }); } @@ -42,7 +38,7 @@ public void testPublishFalseNoEmitError() { @DisplayName("there should be no exception if we attempt to emit metrics without initializing") public void testNoInitializeNoEmitError() { Assertions.assertDoesNotThrow(() -> { - DogStatsDMetricSingleton.gauge(AirbyteMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1); + DogStatsDMetricSingleton.gauge(MetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1); }); } diff --git a/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/llib/MetrisQueriesTest.java b/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/lib/MetrisQueriesTest.java similarity index 98% rename from airbyte-metrics/lib/src/test/java/io/airbyte/metrics/llib/MetrisQueriesTest.java rename to airbyte-metrics/lib/src/test/java/io/airbyte/metrics/lib/MetrisQueriesTest.java index 8579e6b336664..b55bd1c51ae53 100644 --- a/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/llib/MetrisQueriesTest.java +++ b/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/lib/MetrisQueriesTest.java @@ -2,7 +2,7 @@ * Copyright (c) 2021 Airbyte, Inc., all rights reserved. */ -package io.airbyte.metrics.llib; +package io.airbyte.metrics.lib; import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR; import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_DEFINITION; @@ -15,7 +15,6 @@ import io.airbyte.db.instance.configs.jooq.enums.NamespaceDefinitionType; import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage; import io.airbyte.db.instance.test.TestDatabaseProviders; -import io.airbyte.metrics.lib.MetricQueries; import java.io.IOException; import java.sql.SQLException; import java.util.List; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java index 28c0f1f3825b3..1fbf15a996e6e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -10,8 +10,8 @@ import io.airbyte.commons.resources.MoreResources; import io.airbyte.config.ResourceRequirements; import io.airbyte.config.TolerationPOJO; -import io.airbyte.metrics.lib.AirbyteMetricsRegistry; import io.airbyte.metrics.lib.DogStatsDMetricSingleton; +import io.airbyte.metrics.lib.MetricsRegistry; import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.ContainerBuilder; import io.fabric8.kubernetes.api.model.ContainerPort; @@ -510,7 +510,7 @@ public KubePodProcess(final boolean isOrchestrator, final boolean isReady = Objects.nonNull(p) && Readiness.getInstance().isReady(p); return isReady || isTerminal(p); }, 20, TimeUnit.MINUTES); - DogStatsDMetricSingleton.recordTimeGlobal(AirbyteMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, System.currentTimeMillis() - start); + DogStatsDMetricSingleton.recordTimeGlobal(MetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, System.currentTimeMillis() - start); // allow writing stdin to pod LOGGER.info("Reading pod IP..."); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index 6d4c2defb34d9..8b0228a4d7e34 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -18,10 +18,10 @@ import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage; -import io.airbyte.metrics.lib.AirbyteMetricsRegistry; import io.airbyte.metrics.lib.DogStatsDMetricSingleton; import io.airbyte.metrics.lib.MetricQueries; import io.airbyte.metrics.lib.MetricTags; +import io.airbyte.metrics.lib.MetricsRegistry; import io.airbyte.scheduler.models.Job; import io.airbyte.scheduler.persistence.JobCreator; import io.airbyte.scheduler.persistence.JobNotifier; @@ -102,7 +102,7 @@ private void emitSrcIdDstIdToReleaseStagesMetric(final UUID srcId, final UUID ds } for (final ReleaseStage stage : releaseStages) { - DogStatsDMetricSingleton.count(AirbyteMetricsRegistry.JOB_CREATED_BY_RELEASE_STAGE, 1, MetricTags.RELEASE_STAGE + stage.getLiteral()); + DogStatsDMetricSingleton.count(MetricsRegistry.JOB_CREATED_BY_RELEASE_STAGE, 1, MetricTags.RELEASE_STAGE + stage.getLiteral()); } } @@ -139,7 +139,7 @@ public void jobSuccess(final JobSuccessInput input) { final Job job = jobPersistence.getJob(jobId); jobNotifier.successJob(job); - emitJobIdToReleaseStagesMetric(AirbyteMetricsRegistry.JOB_SUCCEEDED_BY_RELEASE_STAGE, jobId); + emitJobIdToReleaseStagesMetric(MetricsRegistry.JOB_SUCCEEDED_BY_RELEASE_STAGE, jobId); trackCompletion(job, JobStatus.SUCCEEDED); } catch (final IOException e) { throw new RetryableException(e); @@ -154,7 +154,7 @@ public void jobFailure(final JobFailureInput input) { final Job job = jobPersistence.getJob(jobId); jobNotifier.failJob(input.getReason(), job); - emitJobIdToReleaseStagesMetric(AirbyteMetricsRegistry.JOB_FAILED_BY_RELEASE_STAGE, jobId); + emitJobIdToReleaseStagesMetric(MetricsRegistry.JOB_FAILED_BY_RELEASE_STAGE, jobId); trackCompletion(job, JobStatus.FAILED); } catch (final IOException e) { throw new RetryableException(e); @@ -188,7 +188,7 @@ public void jobCancelled(final JobCancelledInput input) { final Job job = jobPersistence.getJob(jobId); trackCompletion(job, JobStatus.FAILED); - emitJobIdToReleaseStagesMetric(AirbyteMetricsRegistry.JOB_SUCCEEDED_BY_RELEASE_STAGE, jobId); + emitJobIdToReleaseStagesMetric(MetricsRegistry.JOB_SUCCEEDED_BY_RELEASE_STAGE, jobId); jobNotifier.failJob("Job was cancelled", job); } catch (final IOException e) { throw new RetryableException(e); @@ -205,7 +205,7 @@ public void reportJobStart(final ReportJobStartInput input) { } } - private void emitJobIdToReleaseStagesMetric(final AirbyteMetricsRegistry metric, final long jobId) throws IOException { + private void emitJobIdToReleaseStagesMetric(final MetricsRegistry metric, final long jobId) throws IOException { final var releaseStages = configRepository.getDatabase().query(ctx -> MetricQueries.jobIdToReleaseStages(ctx, jobId)); if (releaseStages == null) { return; diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java index 5110f7261bc7b..205045b3051dc 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java @@ -169,6 +169,8 @@ class Update { @Test public void setJobSuccess() throws IOException { + Mockito.when(mConfigRepository.getDatabase()).thenReturn(Mockito.mock(ExceptionWrappingDatabase.class)); + jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(JOB_ID, ATTEMPT_ID, standardSyncOutput)); Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput); @@ -189,6 +191,8 @@ public void setJobSuccessWrapException() throws IOException { @Test public void setJobFailure() throws IOException { + Mockito.when(mConfigRepository.getDatabase()).thenReturn(Mockito.mock(ExceptionWrappingDatabase.class)); + jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(JOB_ID, "reason")); Mockito.verify(mJobPersistence).failJob(JOB_ID); @@ -228,6 +232,8 @@ public void setAttemptFailureWrapException() throws IOException { @Test public void setJobCancelled() throws IOException { + Mockito.when(mConfigRepository.getDatabase()).thenReturn(Mockito.mock(ExceptionWrappingDatabase.class)); + jobCreationAndStatusUpdateActivity.jobCancelled(new JobCancelledInput(JOB_ID, ATTEMPT_ID, failureSummary)); Mockito.verify(mJobPersistence).cancelJob(JOB_ID); From d636cc0a1e94fc2893831aa38892df29dfcb162d Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Fri, 25 Feb 2022 14:47:54 +0800 Subject: [PATCH 7/8] More method refactoring. --- .../src/main/java/io/airbyte/metrics/lib/MetricTags.java | 8 +++++++- .../JobCreationAndStatusUpdateActivityImpl.java | 4 ++-- .../JobCreationAndStatusUpdateActivityTest.java | 2 +- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java index f3d74e67a4084..9e2b98d5659b4 100644 --- a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java @@ -4,11 +4,17 @@ package io.airbyte.metrics.lib; +import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage; + /** * Keep track of all metric tags. */ public class MetricTags { - public static final String RELEASE_STAGE = "release_stage:"; + private static final String RELEASE_STAGE = "release_stage:"; + + public static String getReleaseStage(final ReleaseStage stage) { + return RELEASE_STAGE + ":" + stage.getLiteral(); + } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index 8b0228a4d7e34..47e49165ac503 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -102,7 +102,7 @@ private void emitSrcIdDstIdToReleaseStagesMetric(final UUID srcId, final UUID ds } for (final ReleaseStage stage : releaseStages) { - DogStatsDMetricSingleton.count(MetricsRegistry.JOB_CREATED_BY_RELEASE_STAGE, 1, MetricTags.RELEASE_STAGE + stage.getLiteral()); + DogStatsDMetricSingleton.count(MetricsRegistry.JOB_CREATED_BY_RELEASE_STAGE, 1, MetricTags.getReleaseStage(stage)); } } @@ -212,7 +212,7 @@ private void emitJobIdToReleaseStagesMetric(final MetricsRegistry metric, final } for (final ReleaseStage stage : releaseStages) { - DogStatsDMetricSingleton.count(metric, 1, MetricTags.RELEASE_STAGE + stage.getLiteral()); + DogStatsDMetricSingleton.count(metric, 1, MetricTags.getReleaseStage(stage)); } } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java index 205045b3051dc..475f93c7dffcc 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java @@ -233,7 +233,7 @@ public void setAttemptFailureWrapException() throws IOException { @Test public void setJobCancelled() throws IOException { Mockito.when(mConfigRepository.getDatabase()).thenReturn(Mockito.mock(ExceptionWrappingDatabase.class)); - + jobCreationAndStatusUpdateActivity.jobCancelled(new JobCancelledInput(JOB_ID, ATTEMPT_ID, failureSummary)); Mockito.verify(mJobPersistence).cancelJob(JOB_ID); From 8b8d2454352930d9e74625c5035fe6871bc3557c Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Fri, 25 Feb 2022 19:15:16 +0800 Subject: [PATCH 8/8] Fix wrong tag and metric name. --- .../src/main/java/io/airbyte/metrics/lib/MetricTags.java | 8 ++++++-- .../JobCreationAndStatusUpdateActivityImpl.java | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java index 9e2b98d5659b4..157514e9fcb7b 100644 --- a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java @@ -11,10 +11,14 @@ */ public class MetricTags { - private static final String RELEASE_STAGE = "release_stage:"; + private static final String RELEASE_STAGE = "release_stage"; public static String getReleaseStage(final ReleaseStage stage) { - return RELEASE_STAGE + ":" + stage.getLiteral(); + return tagDelimit(RELEASE_STAGE, stage.getLiteral()); + } + + private static String tagDelimit(final String tagName, final String tagVal) { + return String.join(":", tagName, tagVal); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index 47e49165ac503..334d422f3836d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -188,7 +188,7 @@ public void jobCancelled(final JobCancelledInput input) { final Job job = jobPersistence.getJob(jobId); trackCompletion(job, JobStatus.FAILED); - emitJobIdToReleaseStagesMetric(MetricsRegistry.JOB_SUCCEEDED_BY_RELEASE_STAGE, jobId); + emitJobIdToReleaseStagesMetric(MetricsRegistry.JOB_CANCELLED_BY_RELEASE_STAGE, jobId); jobNotifier.failJob("Job was cancelled", job); } catch (final IOException e) { throw new RetryableException(e);