Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Common code to deserialize a state message in the new format #13772

Merged
merged 14 commits into from
Jun 18, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.helpers;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StateType;
import io.airbyte.config.StateWrapper;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
import java.util.List;
import java.util.Optional;

public class StateMessageHelper {

public static class AirbyteStateMessageListTypeReference extends TypeReference<List<AirbyteStateMessage>> {}

/**
* This a takes a json blob state and tries return either a legacy state in the format of a json
* object or a state message with the new format which is a list of airbyte state message.
*
* @param state
* @return Either a json blob (on the left) or a structure state message.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is out of date.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*/
public static Optional<StateWrapper> getTypedState(JsonNode state) {
if (state == null) {
return Optional.empty();
} else {
try {
List<AirbyteStateMessage> stateMessages = Jsons.object(state, new AirbyteStateMessageListTypeReference());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you make sure your IDE is turned on to add finals, please? docs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (stateMessages.size() == 1 && stateMessages.get(0).getStateType() == AirbyteStateType.GLOBAL) {
return Optional.of(new StateWrapper()
.withStateType(StateType.GLOBAL)
.withGlobal(stateMessages.get(0)));
} else if (stateMessages.size() >= 1
&& stateMessages.stream().filter(stateMessage -> stateMessage.getStateType() != AirbyteStateType.STREAM).toList().isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need this >= 1 check? is stateMessages.stream().allMatch(stateMessage -> stateMessage.getStateType() == AirbyteStateType.STREAM) enough?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I do something similar to this in the StateManagerFactory by just doing looking at the first element and then switching from there. The assumption is that all state messages will have the same type. This may be a faulty assumption and could be remedied with a precondition check that makes sure that all messages have the same type. This would remove all of the size/index checking going on in this code. Something like:

final Optional<AirbyteStateMessage> airbyteStateMessage = stateMessages.stream().findFirst();
if (airbyteStateMessage.isPresent()) {
    switch(airbyteStateMessage.get().getStateType()) {
        case GLOBAL:
            return Optional.of(new StateWrapper() ...)
        case STREAM:
            return Optional(... )
        case LEGACY:
        default:
            return Optional(...)
    }
} else {
    // Handle empty state list?
}

Something like that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered that having a validation would be better given the fact that this object comes from a blob and can be anything.

I need to check that the list contains at least element for the per stream state because having an empty list is not valid (There is no way to tell if it is global or per stream and we should save a state like that). That raised another question which is: is [] valid for a legacy state.

return Optional.of(new StateWrapper()
.withStateType(StateType.STREAM)
.withStateMessages(stateMessages));
} else {
throw new IllegalStateException("Unexpected state blob");
}

} catch (final IllegalArgumentException e) {
return Optional.of(new StateWrapper()
.withStateType(StateType.LEGACY)
.withLegacyState(state));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a nit but i would suggest moving this into a helper method. the catch being so far from where the exception is thrown isn't very easy to parse.

  /**
   * Parses a state object as a list of AirbyteStateMessages. If cannot be parsed as such, returns an empty optional.
   * @param state - state blob of unknown type either legacy (just a json blob) or global or stream which will be a list of AirbyteStateMessages.
   * @return optional of list of AirbyteStateMessages if parseable, otherwise empty optional.
   */
  private static Optional<List<AirbyteStateMessage>> getStateAsListOrEmpty(JsonNode state)
    try {
      return Optional.of(Jsons.object(state, new AirbyteStateMessageListTypeReference()));
    } catch (final IllegalArgumentException e) {
      return Optional.empty();
    }
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the catch block closer to the sentence throwing an exception. I believe that the state type makes if very explicit about what is being returned and thus the extra method is not required.

}
/*
* if (state == null) { // return Either.right(new ArrayList<>()); } try { return
* Either.right(Jsons.object(state, new AirbyteStateMessageListTypeReference())); } catch (final
* IllegalArgumentException e) { return Either.left(state); }
*/
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.helpers;

import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StateType;
import io.airbyte.config.StateWrapper;
import io.airbyte.protocol.models.AirbyteGlobalState;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.AirbyteStreamState;
import io.airbyte.protocol.models.StreamDescriptor;
import java.util.Optional;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class StateMessageHelperTest {

@Test
public void testEmpty() {
Optional<StateWrapper> stateWrapper = StateMessageHelper.getTypedState(null);
Assertions.assertThat(stateWrapper).isEmpty();
}

@Test
public void testLegacy() {
Optional<StateWrapper> stateWrapper = StateMessageHelper.getTypedState(Jsons.emptyObject());
Assertions.assertThat(stateWrapper).isNotEmpty();
Assertions.assertThat(stateWrapper.get().getStateType()).isEqualTo(StateType.LEGACY);
}

@Test
public void testGlobal() {
AirbyteStateMessage stateMessage = new AirbyteStateMessage()
.withStateType(AirbyteStateType.GLOBAL)
.withGlobal(
new AirbyteGlobalState()
.withSharedState(Jsons.emptyObject())
.withStreamStates(Lists.newArrayList(
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()),
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject()))));
Optional<StateWrapper> stateWrapper = StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage)));
Assertions.assertThat(stateWrapper).isNotEmpty();
Assertions.assertThat(stateWrapper.get().getStateType()).isEqualTo(StateType.GLOBAL);
Assertions.assertThat(stateWrapper.get().getGlobal()).isEqualTo(stateMessage);
}

@Test
public void testStream() {
AirbyteStateMessage stateMessage1 = new AirbyteStateMessage()
.withStateType(AirbyteStateType.STREAM)
.withStream(
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()));
AirbyteStateMessage stateMessage2 = new AirbyteStateMessage()
.withStateType(AirbyteStateType.STREAM)
.withStream(
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject()));
Optional<StateWrapper> stateWrapper = StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage1, stateMessage2)));
Assertions.assertThat(stateWrapper).isNotEmpty();
Assertions.assertThat(stateWrapper.get().getStateType()).isEqualTo(StateType.STREAM);
Assertions.assertThat(stateWrapper.get().getStateMessages()).containsExactlyInAnyOrder(stateMessage1, stateMessage2);
}

@Test
public void testInvalidMixedState() {
AirbyteStateMessage stateMessage1 = new AirbyteStateMessage()
.withStateType(AirbyteStateType.STREAM)
.withStream(
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()));
AirbyteStateMessage stateMessage2 = new AirbyteStateMessage()
.withStateType(AirbyteStateType.GLOBAL)
.withGlobal(
new AirbyteGlobalState()
.withSharedState(Jsons.emptyObject())
.withStreamStates(Lists.newArrayList(
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()),
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject()))));
Assertions.assertThatThrownBy(() -> StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage1, stateMessage2))))
.isInstanceOf(IllegalStateException.class);
}

@Test
public void testDuplicatedGlobalState() {
AirbyteStateMessage stateMessage1 = new AirbyteStateMessage()
.withStateType(AirbyteStateType.GLOBAL)
.withGlobal(
new AirbyteGlobalState()
.withSharedState(Jsons.emptyObject())
.withStreamStates(Lists.newArrayList(
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()),
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject()))));
AirbyteStateMessage stateMessage2 = new AirbyteStateMessage()
.withStateType(AirbyteStateType.GLOBAL)
.withGlobal(
new AirbyteGlobalState()
.withSharedState(Jsons.emptyObject())
.withStreamStates(Lists.newArrayList(
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()),
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject()))));
Assertions.assertThatThrownBy(() -> StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage1, stateMessage2))))
.isInstanceOf(IllegalStateException.class);
}

@Test
public void testEmptyStateList() {
Assertions.assertThatThrownBy(() -> StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList())))
.isInstanceOf(IllegalStateException.class);
}

}
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ spotless {
eclipse('4.21.0').configFile(rootProject.file('tools/gradle/codestyle/java-google-style.xml'))

licenseHeaderFile createJavaLicenseWith(rootProject.file('LICENSE_SHORT'))
removeUnusedImports()
// removeUnusedImports()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I am having issue with spotless on my local and it is what I need to do for be able to build. I forget to remove it.

trimTrailingWhitespace()
}
groovyGradle {
Expand Down