From d0b9de1c3ef32ce98547e985e6caa48670ccc314 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Tue, 28 Jun 2022 14:51:19 -0700 Subject: [PATCH] Change the persistence activity to use the new persistence layer (#14205) * Change the persistence activity to use the new persistence layer * Use lombok * format * Use new State message helper --- .../java/io/airbyte/workers/WorkerApp.java | 9 ++- .../sync/PersistStateActivityImpl.java | 26 ++++---- .../sync/PersistStateActivityTest.java | 60 +++++++++++++++++++ 3 files changed, 80 insertions(+), 15 deletions(-) create mode 100644 airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/PersistStateActivityTest.java diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index 54c6b9af7f2ca..23c309e202181 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -19,6 +19,7 @@ import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.DatabaseConfigPersistence; +import io.airbyte.config.persistence.StatePersistence; import io.airbyte.config.persistence.StreamResetPersistence; import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor; import io.airbyte.config.persistence.split_secrets.SecretPersistence; @@ -140,6 +141,7 @@ public class WorkerApp { private final JobErrorReporter jobErrorReporter; private final StreamResetPersistence streamResetPersistence; private final FeatureFlags featureFlags; + private final StatePersistence statePersistence; public void start() { final Map mdc = MDC.getCopyOfContextMap(); @@ -225,7 +227,7 @@ private void registerSync(final WorkerFactory factory) { defaultWorkerConfigs, defaultProcessFactory); - final PersistStateActivityImpl persistStateActivity = new PersistStateActivityImpl(workspaceRoot, configRepository); + final PersistStateActivityImpl persistStateActivity = new PersistStateActivityImpl(statePersistence, featureFlags); final Worker syncWorker = factory.newWorker(TemporalJobType.SYNC.name(), getWorkerOptions(maxWorkers.getMaxSyncWorkers())); syncWorker.registerWorkflowImplementationTypes(SyncWorkflowImpl.class); @@ -447,6 +449,8 @@ private static void launchWorkerApp(final Configs configs, final DSLContext conf new JobErrorReporter(configRepository, configs.getDeploymentMode(), configs.getAirbyteVersionOrWarning(), jobErrorReportingClient); final StreamResetPersistence streamResetPersistence = new StreamResetPersistence(configDatabase); + + final StatePersistence statePersistence = new StatePersistence(configDatabase); new WorkerApp( workspaceRoot, defaultProcessFactory, @@ -476,7 +480,8 @@ private static void launchWorkerApp(final Configs configs, final DSLContext conf jobTracker, jobErrorReporter, streamResetPersistence, - featureFlags).start(); + featureFlags, + statePersistence).start(); } public static void main(final String[] args) { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java index efd29635e90af..e100e79fa2725 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java @@ -4,32 +4,32 @@ package io.airbyte.workers.temporal.sync; +import io.airbyte.commons.features.FeatureFlags; import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.State; -import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.config.StateWrapper; +import io.airbyte.config.helpers.StateMessageHelper; +import io.airbyte.config.persistence.StatePersistence; import java.io.IOException; -import java.nio.file.Path; +import java.util.Optional; import java.util.UUID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.AllArgsConstructor; +@AllArgsConstructor public class PersistStateActivityImpl implements PersistStateActivity { - private static final Logger LOGGER = LoggerFactory.getLogger(PersistStateActivityImpl.class); - private final Path workspaceRoot; - private final ConfigRepository configRepository; - - public PersistStateActivityImpl(final Path workspaceRoot, final ConfigRepository configRepository) { - this.workspaceRoot = workspaceRoot; - this.configRepository = configRepository; - } + private final StatePersistence statePersistence; + private final FeatureFlags featureFlags; @Override public boolean persist(final UUID connectionId, final StandardSyncOutput syncOutput) { final State state = syncOutput.getState(); if (state != null) { try { - configRepository.updateConnectionState(connectionId, state); + final Optional maybeStateWrapper = StateMessageHelper.getTypedState(state.getState(), featureFlags.useStreamCapableState()); + if (maybeStateWrapper.isPresent()) { + statePersistence.updateOrCreateState(connectionId, maybeStateWrapper.get()); + } } catch (final IOException e) { throw new RuntimeException(e); } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/PersistStateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/PersistStateActivityTest.java new file mode 100644 index 0000000000000..4d51970e0f8a9 --- /dev/null +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/PersistStateActivityTest.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal.sync; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.features.FeatureFlags; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.StandardSyncOutput; +import io.airbyte.config.State; +import io.airbyte.config.StateWrapper; +import io.airbyte.config.persistence.StatePersistence; +import java.io.IOException; +import java.util.UUID; +import org.elasticsearch.common.collect.Map; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class PersistStateActivityTest { + + private final static UUID CONNECTION_ID = UUID.randomUUID(); + + @Mock + StatePersistence statePersistence; + + @Mock + FeatureFlags featureFlags; + + @InjectMocks + PersistStateActivityImpl persistStateActivity; + + @Test + public void testPersistEmpty() { + persistStateActivity.persist(CONNECTION_ID, new StandardSyncOutput()); + + Mockito.verifyNoInteractions(statePersistence); + } + + @Test + public void testPersist() throws IOException { + Mockito.when(featureFlags.useStreamCapableState()).thenReturn(true); + + final JsonNode jsonState = Jsons.jsonNode(Map.ofEntries( + Map.entry("some", "state"))); + + final State state = new State().withState(jsonState); + + persistStateActivity.persist(CONNECTION_ID, new StandardSyncOutput().withState(state)); + + // The ser/der of the state into a state wrapper is tested in StateMessageHelperTest + Mockito.verify(statePersistence).updateOrCreateState(Mockito.eq(CONNECTION_ID), Mockito.any(StateWrapper.class)); + } + +}