-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Common code to deserialize a state message in the new format #13772
Changes from 9 commits
6afbdbe
ee7fd54
545e6ad
b2e3250
7707a97
e3c881b
27adc8b
4ee75aa
c213f71
280b114
9cb57f6
0ea7eba
0ae05a2
59eba26
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.config.helpers; | ||
|
||
import com.fasterxml.jackson.core.type.TypeReference; | ||
import com.fasterxml.jackson.databind.JsonNode; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.config.StateType; | ||
import io.airbyte.config.StateWrapper; | ||
import io.airbyte.protocol.models.AirbyteStateMessage; | ||
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; | ||
import java.util.List; | ||
import java.util.Optional; | ||
|
||
public class StateMessageHelper { | ||
|
||
public static class AirbyteStateMessageListTypeReference extends TypeReference<List<AirbyteStateMessage>> {} | ||
|
||
/** | ||
* This a takes a json blob state and tries return either a legacy state in the format of a json | ||
* object or a state message with the new format which is a list of airbyte state message. | ||
* | ||
* @param state | ||
* @return Either a json blob (on the left) or a structure state message. | ||
*/ | ||
public static Optional<StateWrapper> getTypedState(JsonNode state) { | ||
if (state == null) { | ||
return Optional.empty(); | ||
} else { | ||
try { | ||
List<AirbyteStateMessage> stateMessages = Jsons.object(state, new AirbyteStateMessageListTypeReference()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. final There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you make sure your IDE is turned on to add finals, please? docs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
if (stateMessages.size() == 1 && stateMessages.get(0).getStateType() == AirbyteStateType.GLOBAL) { | ||
return Optional.of(new StateWrapper() | ||
.withStateType(StateType.GLOBAL) | ||
.withGlobal(stateMessages.get(0))); | ||
} else if (stateMessages.size() >= 1 | ||
&& stateMessages.stream().filter(stateMessage -> stateMessage.getStateType() != AirbyteStateType.STREAM).toList().isEmpty()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do you need this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW, I do something similar to this in the
Something like that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I considered that having a validation would be better given the fact that this object comes from a blob and can be anything. I need to check that the list contains at least element for the per stream state because having an empty list is not valid (There is no way to tell if it is global or per stream and we should save a state like that). That raised another question which is: is |
||
return Optional.of(new StateWrapper() | ||
.withStateType(StateType.STREAM) | ||
.withStateMessages(stateMessages)); | ||
} else { | ||
throw new IllegalStateException("Unexpected state blob"); | ||
} | ||
|
||
} catch (final IllegalArgumentException e) { | ||
return Optional.of(new StateWrapper() | ||
.withStateType(StateType.LEGACY) | ||
.withLegacyState(state)); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a nit but i would suggest moving this into a helper method. the catch being so far from where the exception is thrown isn't very easy to parse.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I moved the catch block closer to the sentence throwing an exception. I believe that the state type makes if very explicit about what is being returned and thus the extra method is not required. |
||
} | ||
/* | ||
* if (state == null) { // return Either.right(new ArrayList<>()); } try { return | ||
* Either.right(Jsons.object(state, new AirbyteStateMessageListTypeReference())); } catch (final | ||
* IllegalArgumentException e) { return Either.left(state); } | ||
*/ | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.config.helpers; | ||
|
||
import com.google.common.collect.Lists; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.config.StateType; | ||
import io.airbyte.config.StateWrapper; | ||
import io.airbyte.protocol.models.AirbyteGlobalState; | ||
import io.airbyte.protocol.models.AirbyteStateMessage; | ||
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; | ||
import io.airbyte.protocol.models.AirbyteStreamState; | ||
import io.airbyte.protocol.models.StreamDescriptor; | ||
import java.util.Optional; | ||
import org.assertj.core.api.Assertions; | ||
import org.junit.jupiter.api.Test; | ||
|
||
public class StateMessageHelperTest { | ||
|
||
@Test | ||
public void testEmpty() { | ||
Optional<StateWrapper> stateWrapper = StateMessageHelper.getTypedState(null); | ||
Assertions.assertThat(stateWrapper).isEmpty(); | ||
} | ||
|
||
@Test | ||
public void testLegacy() { | ||
Optional<StateWrapper> stateWrapper = StateMessageHelper.getTypedState(Jsons.emptyObject()); | ||
Assertions.assertThat(stateWrapper).isNotEmpty(); | ||
Assertions.assertThat(stateWrapper.get().getStateType()).isEqualTo(StateType.LEGACY); | ||
} | ||
|
||
@Test | ||
public void testGlobal() { | ||
AirbyteStateMessage stateMessage = new AirbyteStateMessage() | ||
.withStateType(AirbyteStateType.GLOBAL) | ||
.withGlobal( | ||
new AirbyteGlobalState() | ||
.withSharedState(Jsons.emptyObject()) | ||
.withStreamStates(Lists.newArrayList( | ||
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()), | ||
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject())))); | ||
Optional<StateWrapper> stateWrapper = StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage))); | ||
Assertions.assertThat(stateWrapper).isNotEmpty(); | ||
Assertions.assertThat(stateWrapper.get().getStateType()).isEqualTo(StateType.GLOBAL); | ||
Assertions.assertThat(stateWrapper.get().getGlobal()).isEqualTo(stateMessage); | ||
} | ||
|
||
@Test | ||
public void testStream() { | ||
AirbyteStateMessage stateMessage1 = new AirbyteStateMessage() | ||
.withStateType(AirbyteStateType.STREAM) | ||
.withStream( | ||
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject())); | ||
AirbyteStateMessage stateMessage2 = new AirbyteStateMessage() | ||
.withStateType(AirbyteStateType.STREAM) | ||
.withStream( | ||
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject())); | ||
Optional<StateWrapper> stateWrapper = StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage1, stateMessage2))); | ||
Assertions.assertThat(stateWrapper).isNotEmpty(); | ||
Assertions.assertThat(stateWrapper.get().getStateType()).isEqualTo(StateType.STREAM); | ||
Assertions.assertThat(stateWrapper.get().getStateMessages()).containsExactlyInAnyOrder(stateMessage1, stateMessage2); | ||
} | ||
|
||
@Test | ||
public void testInvalidMixedState() { | ||
AirbyteStateMessage stateMessage1 = new AirbyteStateMessage() | ||
.withStateType(AirbyteStateType.STREAM) | ||
.withStream( | ||
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject())); | ||
AirbyteStateMessage stateMessage2 = new AirbyteStateMessage() | ||
.withStateType(AirbyteStateType.GLOBAL) | ||
.withGlobal( | ||
new AirbyteGlobalState() | ||
.withSharedState(Jsons.emptyObject()) | ||
.withStreamStates(Lists.newArrayList( | ||
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()), | ||
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject())))); | ||
Assertions.assertThatThrownBy(() -> StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage1, stateMessage2)))) | ||
.isInstanceOf(IllegalStateException.class); | ||
} | ||
|
||
@Test | ||
public void testDuplicatedGlobalState() { | ||
AirbyteStateMessage stateMessage1 = new AirbyteStateMessage() | ||
.withStateType(AirbyteStateType.GLOBAL) | ||
.withGlobal( | ||
new AirbyteGlobalState() | ||
.withSharedState(Jsons.emptyObject()) | ||
.withStreamStates(Lists.newArrayList( | ||
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()), | ||
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject())))); | ||
AirbyteStateMessage stateMessage2 = new AirbyteStateMessage() | ||
.withStateType(AirbyteStateType.GLOBAL) | ||
.withGlobal( | ||
new AirbyteGlobalState() | ||
.withSharedState(Jsons.emptyObject()) | ||
.withStreamStates(Lists.newArrayList( | ||
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()), | ||
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject())))); | ||
Assertions.assertThatThrownBy(() -> StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage1, stateMessage2)))) | ||
.isInstanceOf(IllegalStateException.class); | ||
} | ||
|
||
@Test | ||
public void testEmptyStateList() { | ||
Assertions.assertThatThrownBy(() -> StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList()))) | ||
.isInstanceOf(IllegalStateException.class); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -124,7 +124,7 @@ spotless { | |
eclipse('4.21.0').configFile(rootProject.file('tools/gradle/codestyle/java-google-style.xml')) | ||
|
||
licenseHeaderFile createJavaLicenseWith(rootProject.file('LICENSE_SHORT')) | ||
removeUnusedImports() | ||
// removeUnusedImports() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. intentional? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, I am having issue with spotless on my local and it is what I need to do for be able to build. I forget to remove it. |
||
trimTrailingWhitespace() | ||
} | ||
groovyGradle { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is out of date.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done