-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Common code to deserialize a state message in the new format (#13772)
* Common code to deserialize a state message in the new format * PR comments and type changed to typed * Format * Add StateType and StateWrapper objects to the model * Use state wrapper instead of Either * Switch to optional * PR comments * Support array legacy state * format Co-authored-by: Jimmy Ma <jimmy@airbyte.io>
- Loading branch information
1 parent
74d16cc
commit 5852989
Showing
2 changed files
with
185 additions
and
0 deletions.
There are no files selected for viewing
62 changes: 62 additions & 0 deletions
62
airbyte-config/config-models/src/main/java/io/airbyte/config/helpers/StateMessageHelper.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.config.helpers; | ||
|
||
import com.fasterxml.jackson.core.type.TypeReference; | ||
import com.fasterxml.jackson.databind.JsonNode; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.config.StateType; | ||
import io.airbyte.config.StateWrapper; | ||
import io.airbyte.protocol.models.AirbyteStateMessage; | ||
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; | ||
import java.util.List; | ||
import java.util.Optional; | ||
|
||
public class StateMessageHelper { | ||
|
||
public static class AirbyteStateMessageListTypeReference extends TypeReference<List<AirbyteStateMessage>> {} | ||
|
||
/** | ||
* This a takes a json blob state and tries return either a legacy state in the format of a json | ||
* object or a state message with the new format which is a list of airbyte state message. | ||
* | ||
* @param state - a blob representing the state | ||
* @return An optional state wrapper, if there is no state an empty optional will be returned | ||
*/ | ||
public static Optional<StateWrapper> getTypedState(final JsonNode state) { | ||
if (state == null) { | ||
return Optional.empty(); | ||
} else { | ||
final List<AirbyteStateMessage> stateMessages; | ||
try { | ||
stateMessages = Jsons.object(state, new AirbyteStateMessageListTypeReference()); | ||
} catch (final IllegalArgumentException e) { | ||
return Optional.of(getLegacyStateWrapper(state)); | ||
} | ||
if (stateMessages.stream().anyMatch(streamMessage -> !streamMessage.getAdditionalProperties().isEmpty())) { | ||
return Optional.of(getLegacyStateWrapper(state)); | ||
} | ||
if (stateMessages.size() == 1 && stateMessages.get(0).getStateType() == AirbyteStateType.GLOBAL) { | ||
return Optional.of(new StateWrapper() | ||
.withStateType(StateType.GLOBAL) | ||
.withGlobal(stateMessages.get(0))); | ||
} else if (stateMessages.size() >= 1 | ||
&& stateMessages.stream().allMatch(stateMessage -> stateMessage.getStateType() == AirbyteStateType.STREAM)) { | ||
return Optional.of(new StateWrapper() | ||
.withStateType(StateType.STREAM) | ||
.withStateMessages(stateMessages)); | ||
} else { | ||
throw new IllegalStateException("Unexpected state blob"); | ||
} | ||
} | ||
} | ||
|
||
private static StateWrapper getLegacyStateWrapper(final JsonNode state) { | ||
return new StateWrapper() | ||
.withStateType(StateType.LEGACY) | ||
.withLegacyState(state); | ||
} | ||
|
||
} |
123 changes: 123 additions & 0 deletions
123
...-config/config-models/src/test/java/io/airbyte/config/helpers/StateMessageHelperTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.config.helpers; | ||
|
||
import com.google.common.collect.Lists; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.config.StateType; | ||
import io.airbyte.config.StateWrapper; | ||
import io.airbyte.protocol.models.AirbyteGlobalState; | ||
import io.airbyte.protocol.models.AirbyteStateMessage; | ||
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; | ||
import io.airbyte.protocol.models.AirbyteStreamState; | ||
import io.airbyte.protocol.models.StreamDescriptor; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import org.assertj.core.api.Assertions; | ||
import org.junit.jupiter.api.Test; | ||
|
||
public class StateMessageHelperTest { | ||
|
||
@Test | ||
public void testEmpty() { | ||
final Optional<StateWrapper> stateWrapper = StateMessageHelper.getTypedState(null); | ||
Assertions.assertThat(stateWrapper).isEmpty(); | ||
} | ||
|
||
@Test | ||
public void testLegacy() { | ||
final Optional<StateWrapper> stateWrapper = StateMessageHelper.getTypedState(Jsons.emptyObject()); | ||
Assertions.assertThat(stateWrapper).isNotEmpty(); | ||
Assertions.assertThat(stateWrapper.get().getStateType()).isEqualTo(StateType.LEGACY); | ||
} | ||
|
||
@Test | ||
public void testLegacyInList() { | ||
final Optional<StateWrapper> stateWrapper = StateMessageHelper.getTypedState(Jsons.jsonNode( | ||
Lists.newArrayList( | ||
Map.of("Any", "value")))); | ||
Assertions.assertThat(stateWrapper).isNotEmpty(); | ||
Assertions.assertThat(stateWrapper.get().getStateType()).isEqualTo(StateType.LEGACY); | ||
} | ||
|
||
@Test | ||
public void testGlobal() { | ||
final AirbyteStateMessage stateMessage = new AirbyteStateMessage() | ||
.withStateType(AirbyteStateType.GLOBAL) | ||
.withGlobal( | ||
new AirbyteGlobalState() | ||
.withSharedState(Jsons.emptyObject()) | ||
.withStreamStates(Lists.newArrayList( | ||
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()), | ||
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject())))); | ||
final Optional<StateWrapper> stateWrapper = StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage))); | ||
Assertions.assertThat(stateWrapper).isNotEmpty(); | ||
Assertions.assertThat(stateWrapper.get().getStateType()).isEqualTo(StateType.GLOBAL); | ||
Assertions.assertThat(stateWrapper.get().getGlobal()).isEqualTo(stateMessage); | ||
} | ||
|
||
@Test | ||
public void testStream() { | ||
final AirbyteStateMessage stateMessage1 = new AirbyteStateMessage() | ||
.withStateType(AirbyteStateType.STREAM) | ||
.withStream( | ||
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject())); | ||
final AirbyteStateMessage stateMessage2 = new AirbyteStateMessage() | ||
.withStateType(AirbyteStateType.STREAM) | ||
.withStream( | ||
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject())); | ||
final Optional<StateWrapper> stateWrapper = StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage1, stateMessage2))); | ||
Assertions.assertThat(stateWrapper).isNotEmpty(); | ||
Assertions.assertThat(stateWrapper.get().getStateType()).isEqualTo(StateType.STREAM); | ||
Assertions.assertThat(stateWrapper.get().getStateMessages()).containsExactlyInAnyOrder(stateMessage1, stateMessage2); | ||
} | ||
|
||
@Test | ||
public void testInvalidMixedState() { | ||
final AirbyteStateMessage stateMessage1 = new AirbyteStateMessage() | ||
.withStateType(AirbyteStateType.STREAM) | ||
.withStream( | ||
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject())); | ||
final AirbyteStateMessage stateMessage2 = new AirbyteStateMessage() | ||
.withStateType(AirbyteStateType.GLOBAL) | ||
.withGlobal( | ||
new AirbyteGlobalState() | ||
.withSharedState(Jsons.emptyObject()) | ||
.withStreamStates(Lists.newArrayList( | ||
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()), | ||
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject())))); | ||
Assertions.assertThatThrownBy(() -> StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage1, stateMessage2)))) | ||
.isInstanceOf(IllegalStateException.class); | ||
} | ||
|
||
@Test | ||
public void testDuplicatedGlobalState() { | ||
final AirbyteStateMessage stateMessage1 = new AirbyteStateMessage() | ||
.withStateType(AirbyteStateType.GLOBAL) | ||
.withGlobal( | ||
new AirbyteGlobalState() | ||
.withSharedState(Jsons.emptyObject()) | ||
.withStreamStates(Lists.newArrayList( | ||
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()), | ||
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject())))); | ||
final AirbyteStateMessage stateMessage2 = new AirbyteStateMessage() | ||
.withStateType(AirbyteStateType.GLOBAL) | ||
.withGlobal( | ||
new AirbyteGlobalState() | ||
.withSharedState(Jsons.emptyObject()) | ||
.withStreamStates(Lists.newArrayList( | ||
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()), | ||
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject())))); | ||
Assertions.assertThatThrownBy(() -> StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage1, stateMessage2)))) | ||
.isInstanceOf(IllegalStateException.class); | ||
} | ||
|
||
@Test | ||
public void testEmptyStateList() { | ||
Assertions.assertThatThrownBy(() -> StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList()))) | ||
.isInstanceOf(IllegalStateException.class); | ||
} | ||
|
||
} |