From 84bd9836f7027fa69f4370b4aa792868cf264d88 Mon Sep 17 00:00:00 2001 From: benmoriceau Date: Fri, 20 Jan 2023 17:41:23 -0800 Subject: [PATCH 1/3] Fix an npe on a sort fuction --- .../JobCreationAndStatusUpdateActivityImpl.java | 10 +++++++--- .../JobCreationAndStatusUpdateActivityTest.java | 10 ++++++++++ 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index 295112266ecaa..96d8b4b1205f8 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -78,6 +78,8 @@ import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import java.util.stream.Stream; + import lombok.extern.slf4j.Slf4j; @Slf4j @@ -484,9 +486,11 @@ private void failNonTerminalJobs(final UUID connectionId) { private static final Comparator RELEASE_STAGE_COMPARATOR = Comparator.comparingInt(RELEASE_STAGE_ORDER::get); private static List orderByReleaseStageAsc(final List releaseStages) { - final List copiedList = new ArrayList<>(releaseStages); - copiedList.sort(RELEASE_STAGE_COMPARATOR); - return copiedList; + // Using collector to get a mutable list + return releaseStages.stream() + .filter(stage -> stage != null) + .sorted(RELEASE_STAGE_COMPARATOR) + .toList(); } /** diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java index ccdb8447fd6c4..4a33bcc304bb5 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java @@ -33,6 +33,7 @@ import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.StreamResetPersistence; +import io.airbyte.db.instance.configs.jooq.generated.enums.ReleaseStage; import io.airbyte.persistence.job.JobCreator; import io.airbyte.persistence.job.JobNotifier; import io.airbyte.persistence.job.JobPersistence; @@ -535,4 +536,13 @@ void ensureCleanJobState() throws IOException { } + @Test + void testReleaseStageOrdering() { + final List input = List.of(ReleaseStage.alpha, ReleaseStage.custom, ReleaseStage.beta, ReleaseStage.generally_available); + final List expected = List.of(ReleaseStage.custom, ReleaseStage.alpha, ReleaseStage.beta, ReleaseStage.generally_available); + + Assertions.assertThat(JobCreationAndStatusUpdateActivityImpl.orderByReleaseStageAsc(input)) + .containsExactlyElementsOf(expected); + } + } From b955fafde4d71c2c14d5cf468c4e8c6e0444195a Mon Sep 17 00:00:00 2001 From: benmoriceau Date: Sat, 21 Jan 2023 07:06:05 -0800 Subject: [PATCH 2/3] Format --- .../JobCreationAndStatusUpdateActivityImpl.java | 9 +++------ .../JobCreationAndStatusUpdateActivityTest.java | 2 +- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index 96d8b4b1205f8..341c8c86030a0 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -66,7 +66,6 @@ import jakarta.inject.Singleton; import java.io.IOException; import java.nio.file.Path; -import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; @@ -78,8 +77,6 @@ import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; -import java.util.stream.Stream; - import lombok.extern.slf4j.Slf4j; @Slf4j @@ -488,9 +485,9 @@ private void failNonTerminalJobs(final UUID connectionId) { private static List orderByReleaseStageAsc(final List releaseStages) { // Using collector to get a mutable list return releaseStages.stream() - .filter(stage -> stage != null) - .sorted(RELEASE_STAGE_COMPARATOR) - .toList(); + .filter(stage -> stage != null) + .sorted(RELEASE_STAGE_COMPARATOR) + .toList(); } /** diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java index 4a33bcc304bb5..0efcb27e5c6d3 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java @@ -542,7 +542,7 @@ void testReleaseStageOrdering() { final List expected = List.of(ReleaseStage.custom, ReleaseStage.alpha, ReleaseStage.beta, ReleaseStage.generally_available); Assertions.assertThat(JobCreationAndStatusUpdateActivityImpl.orderByReleaseStageAsc(input)) - .containsExactlyElementsOf(expected); + .containsExactlyElementsOf(expected); } } From d774e42a3f9ec9b2cb12ff79a4d3cb19b6883f02 Mon Sep 17 00:00:00 2001 From: benmoriceau Date: Mon, 23 Jan 2023 08:12:44 -0800 Subject: [PATCH 3/3] Fix build --- .../activities/JobCreationAndStatusUpdateActivityImpl.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index 341c8c86030a0..2e321793c832b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -14,6 +14,7 @@ import static io.airbyte.persistence.job.models.AttemptStatus.FAILED; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import datadog.trace.api.Trace; import io.airbyte.commons.docker.DockerUtils; @@ -482,7 +483,8 @@ private void failNonTerminalJobs(final UUID connectionId) { ReleaseStage.generally_available, 4); private static final Comparator RELEASE_STAGE_COMPARATOR = Comparator.comparingInt(RELEASE_STAGE_ORDER::get); - private static List orderByReleaseStageAsc(final List releaseStages) { + @VisibleForTesting + static List orderByReleaseStageAsc(final List releaseStages) { // Using collector to get a mutable list return releaseStages.stream() .filter(stage -> stage != null)