Skip to content

Commit

Permalink
Change the persistence activity to use the new persistence layer (#14205
Browse files Browse the repository at this point in the history
)

* Change the persistence activity to use the new persistence layer

* Use lombok

* format

* Use new State message helper
  • Loading branch information
benmoriceau authored Jun 28, 2022
1 parent ca7b92f commit d0b9de1
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.StatePersistence;
import io.airbyte.config.persistence.StreamResetPersistence;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
Expand Down Expand Up @@ -140,6 +141,7 @@ public class WorkerApp {
private final JobErrorReporter jobErrorReporter;
private final StreamResetPersistence streamResetPersistence;
private final FeatureFlags featureFlags;
private final StatePersistence statePersistence;

public void start() {
final Map<String, String> mdc = MDC.getCopyOfContextMap();
Expand Down Expand Up @@ -225,7 +227,7 @@ private void registerSync(final WorkerFactory factory) {
defaultWorkerConfigs,
defaultProcessFactory);

final PersistStateActivityImpl persistStateActivity = new PersistStateActivityImpl(workspaceRoot, configRepository);
final PersistStateActivityImpl persistStateActivity = new PersistStateActivityImpl(statePersistence, featureFlags);

final Worker syncWorker = factory.newWorker(TemporalJobType.SYNC.name(), getWorkerOptions(maxWorkers.getMaxSyncWorkers()));
syncWorker.registerWorkflowImplementationTypes(SyncWorkflowImpl.class);
Expand Down Expand Up @@ -447,6 +449,8 @@ private static void launchWorkerApp(final Configs configs, final DSLContext conf
new JobErrorReporter(configRepository, configs.getDeploymentMode(), configs.getAirbyteVersionOrWarning(), jobErrorReportingClient);

final StreamResetPersistence streamResetPersistence = new StreamResetPersistence(configDatabase);

final StatePersistence statePersistence = new StatePersistence(configDatabase);
new WorkerApp(
workspaceRoot,
defaultProcessFactory,
Expand Down Expand Up @@ -476,7 +480,8 @@ private static void launchWorkerApp(final Configs configs, final DSLContext conf
jobTracker,
jobErrorReporter,
streamResetPersistence,
featureFlags).start();
featureFlags,
statePersistence).start();
}

public static void main(final String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,32 @@

package io.airbyte.workers.temporal.sync;

import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.State;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.StateWrapper;
import io.airbyte.config.helpers.StateMessageHelper;
import io.airbyte.config.persistence.StatePersistence;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Optional;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.AllArgsConstructor;

@AllArgsConstructor
public class PersistStateActivityImpl implements PersistStateActivity {

private static final Logger LOGGER = LoggerFactory.getLogger(PersistStateActivityImpl.class);
private final Path workspaceRoot;
private final ConfigRepository configRepository;

public PersistStateActivityImpl(final Path workspaceRoot, final ConfigRepository configRepository) {
this.workspaceRoot = workspaceRoot;
this.configRepository = configRepository;
}
private final StatePersistence statePersistence;
private final FeatureFlags featureFlags;

@Override
public boolean persist(final UUID connectionId, final StandardSyncOutput syncOutput) {
final State state = syncOutput.getState();
if (state != null) {
try {
configRepository.updateConnectionState(connectionId, state);
final Optional<StateWrapper> maybeStateWrapper = StateMessageHelper.getTypedState(state.getState(), featureFlags.useStreamCapableState());
if (maybeStateWrapper.isPresent()) {
statePersistence.updateOrCreateState(connectionId, maybeStateWrapper.get());
}
} catch (final IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.sync;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.State;
import io.airbyte.config.StateWrapper;
import io.airbyte.config.persistence.StatePersistence;
import java.io.IOException;
import java.util.UUID;
import org.elasticsearch.common.collect.Map;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
public class PersistStateActivityTest {

private final static UUID CONNECTION_ID = UUID.randomUUID();

@Mock
StatePersistence statePersistence;

@Mock
FeatureFlags featureFlags;

@InjectMocks
PersistStateActivityImpl persistStateActivity;

@Test
public void testPersistEmpty() {
persistStateActivity.persist(CONNECTION_ID, new StandardSyncOutput());

Mockito.verifyNoInteractions(statePersistence);
}

@Test
public void testPersist() throws IOException {
Mockito.when(featureFlags.useStreamCapableState()).thenReturn(true);

final JsonNode jsonState = Jsons.jsonNode(Map.ofEntries(
Map.entry("some", "state")));

final State state = new State().withState(jsonState);

persistStateActivity.persist(CONNECTION_ID, new StandardSyncOutput().withState(state));

// The ser/der of the state into a state wrapper is tested in StateMessageHelperTest
Mockito.verify(statePersistence).updateOrCreateState(Mockito.eq(CONNECTION_ID), Mockito.any(StateWrapper.class));
}

}

0 comments on commit d0b9de1

Please sign in to comment.