diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index 295112266ecaa..2e321793c832b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -14,6 +14,7 @@ import static io.airbyte.persistence.job.models.AttemptStatus.FAILED; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import datadog.trace.api.Trace; import io.airbyte.commons.docker.DockerUtils; @@ -66,7 +67,6 @@ import jakarta.inject.Singleton; import java.io.IOException; import java.nio.file.Path; -import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; @@ -483,10 +483,13 @@ private void failNonTerminalJobs(final UUID connectionId) { ReleaseStage.generally_available, 4); private static final Comparator RELEASE_STAGE_COMPARATOR = Comparator.comparingInt(RELEASE_STAGE_ORDER::get); - private static List orderByReleaseStageAsc(final List releaseStages) { - final List copiedList = new ArrayList<>(releaseStages); - copiedList.sort(RELEASE_STAGE_COMPARATOR); - return copiedList; + @VisibleForTesting + static List orderByReleaseStageAsc(final List releaseStages) { + // Using collector to get a mutable list + return releaseStages.stream() + .filter(stage -> stage != null) + .sorted(RELEASE_STAGE_COMPARATOR) + .toList(); } /** diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java index ccdb8447fd6c4..0efcb27e5c6d3 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java @@ -33,6 +33,7 @@ import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.StreamResetPersistence; +import io.airbyte.db.instance.configs.jooq.generated.enums.ReleaseStage; import io.airbyte.persistence.job.JobCreator; import io.airbyte.persistence.job.JobNotifier; import io.airbyte.persistence.job.JobPersistence; @@ -535,4 +536,13 @@ void ensureCleanJobState() throws IOException { } + @Test + void testReleaseStageOrdering() { + final List input = List.of(ReleaseStage.alpha, ReleaseStage.custom, ReleaseStage.beta, ReleaseStage.generally_available); + final List expected = List.of(ReleaseStage.custom, ReleaseStage.alpha, ReleaseStage.beta, ReleaseStage.generally_available); + + Assertions.assertThat(JobCreationAndStatusUpdateActivityImpl.orderByReleaseStageAsc(input)) + .containsExactlyElementsOf(expected); + } + }