Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit the state to remove in the airbyte empty source #13725

Merged
merged 54 commits into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
3d496d5
Emit the state to remove in the airbyte empty source
benmoriceau Jun 13, 2022
0e275d0
Handle legacy use case
benmoriceau Jun 13, 2022
78df38b
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 13, 2022
f003ad3
Update names
benmoriceau Jun 13, 2022
f24472c
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 14, 2022
d7a63b1
Add test for legacy
benmoriceau Jun 14, 2022
cbcdf92
tmp
benmoriceau Jun 14, 2022
6afbdbe
Common code to deserialize a state message in the new format
benmoriceau Jun 14, 2022
4331d10
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 14, 2022
09661f4
Merge branch 'bmoric/extract-message-serialization' of github.com:air…
benmoriceau Jun 14, 2022
f1269bf
tmp
benmoriceau Jun 14, 2022
ee7fd54
PR comments and type changed to typed
benmoriceau Jun 14, 2022
156e3c4
Merge branch 'bmoric/extract-message-serialization' of github.com:air…
benmoriceau Jun 14, 2022
23f943c
add new message support
benmoriceau Jun 15, 2022
780fe54
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 15, 2022
c255acd
Finish the tests
benmoriceau Jun 15, 2022
545e6ad
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/ext…
benmoriceau Jun 15, 2022
b2e3250
Format
benmoriceau Jun 15, 2022
8785ad7
Merge branch 'bmoric/extract-message-serialization' of github.com:air…
benmoriceau Jun 15, 2022
93c138c
tmp
benmoriceau Jun 16, 2022
7707a97
Add StateType and StateWrapper objects to the model
gosusnp Jun 16, 2022
e3c881b
Merge branch 'gosusnp/add_state_wrapper' of github.com:airbytehq/airb…
benmoriceau Jun 16, 2022
27adc8b
Use state wrapper instead of Either
benmoriceau Jun 16, 2022
c3d9110
Merge branch 'bmoric/extract-message-serialization' of github.com:air…
benmoriceau Jun 16, 2022
4ee75aa
Switch to optional
benmoriceau Jun 16, 2022
47d0ac4
Merge branch 'bmoric/extract-message-serialization' of github.com:air…
benmoriceau Jun 16, 2022
c213f71
Merge branch 'master' into bmoric/extract-message-serialization
benmoriceau Jun 16, 2022
7387b4a
Some PR comments
benmoriceau Jun 16, 2022
ad3371d
PR comments
benmoriceau Jun 16, 2022
11167e6
Add a todo
benmoriceau Jun 16, 2022
0f4be01
Add test for legacy with config
benmoriceau Jun 17, 2022
280b114
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/ext…
benmoriceau Jun 17, 2022
9cb57f6
PR comments
benmoriceau Jun 17, 2022
fdbc10e
PR comments
benmoriceau Jun 17, 2022
0ea7eba
Support array legacy state
benmoriceau Jun 17, 2022
0ae05a2
format
benmoriceau Jun 17, 2022
30e3475
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 17, 2022
03b0eeb
Remove additional properties check
benmoriceau Jun 17, 2022
59eba26
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/ext…
benmoriceau Jun 17, 2022
2749453
Merge branch 'bmoric/extract-message-serialization' of github.com:air…
benmoriceau Jun 17, 2022
859d81e
Small rename
benmoriceau Jun 17, 2022
60917b9
Rename variables
benmoriceau Jun 21, 2022
c14a9b7
PR comments
benmoriceau Jun 21, 2022
8c02f18
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 21, 2022
64d5389
extract ResetSourceConfig creation
benmoriceau Jun 21, 2022
7006dfa
Extract reset config creation
benmoriceau Jun 21, 2022
ce3fd6e
Fix test
benmoriceau Jun 21, 2022
d06e70e
rm error commit
benmoriceau Jun 21, 2022
c125b93
Add comments
benmoriceau Jun 22, 2022
fb242b8
format
benmoriceau Jun 23, 2022
5847625
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 23, 2022
44c306c
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 23, 2022
9eb52f8
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/res…
benmoriceau Jun 23, 2022
5cae07c
Use new state type
benmoriceau Jun 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.helpers;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StateType;
import io.airbyte.config.StateWrapper;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
import java.util.List;
import java.util.Optional;

public class StateMessageHelper {

public static class AirbyteStateMessageListTypeReference extends TypeReference<List<AirbyteStateMessage>> {}

/**
* This a takes a json blob state and tries return either a legacy state in the format of a json
* object or a state message with the new format which is a list of airbyte state message.
*
* @param state
* @return Either a json blob (on the left) or a structure state message.
*/
public static Optional<StateWrapper> getTypedState(JsonNode state) {
if (state == null) {
return Optional.empty();
} else {
try {
List<AirbyteStateMessage> stateMessages = Jsons.object(state, new AirbyteStateMessageListTypeReference());
if (stateMessages.size() == 1 && stateMessages.get(0).getStateType() == AirbyteStateType.GLOBAL) {
return Optional.of(new StateWrapper()
.withStateType(StateType.GLOBAL)
.withGlobal(stateMessages.get(0)));
} else if (stateMessages.size() >= 1
&& stateMessages.stream().filter(stateMessage -> stateMessage.getStateType() != AirbyteStateType.STREAM).toList().isEmpty()) {
return Optional.of(new StateWrapper()
.withStateType(StateType.STREAM)
.withStateMessages(stateMessages));
} else {
throw new IllegalStateException("Unexpected state blob");
}

} catch (final IllegalArgumentException e) {
return Optional.of(new StateWrapper()
.withStateType(StateType.LEGACY)
.withLegacyState(state));
}
}
/*
* if (state == null) { // return Either.right(new ArrayList<>()); } try { return
* Either.right(Jsons.object(state, new AirbyteStateMessageListTypeReference())); } catch (final
* IllegalArgumentException e) { return Either.left(state); }
*/
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/StateType.yaml
title: StateType
description: State Types
type: string
enum:
- global
- stream
- legacy
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/StateWrapper.yaml
title: StateWrapper
description: Wrapper around the different type of States
type: object
additionalProperties: false
required:
- stateType
properties:
stateType:
description: The type of the state being wrapped
"$ref": StateType.yaml
legacyState:
description: Legacy State for states that haven't been migrated yet
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
global:
description: Representation of the shared
type: object
existingJavaType: io.airbyte.protocol.models.AirbyteStateMessage
stateMessages:
type: array
items:
type: object
existingJavaType: io.airbyte.protocol.models.AirbyteStateMessage
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.helpers;

import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StateType;
import io.airbyte.config.StateWrapper;
import io.airbyte.protocol.models.AirbyteGlobalState;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.AirbyteStreamState;
import io.airbyte.protocol.models.StreamDescriptor;
import java.util.Optional;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class StateMessageHelperTest {

@Test
public void testEmpty() {
Optional<StateWrapper> stateWrapper = StateMessageHelper.getTypedState(null);
Assertions.assertThat(stateWrapper).isEmpty();
}

@Test
public void testLegacy() {
Optional<StateWrapper> stateWrapper = StateMessageHelper.getTypedState(Jsons.emptyObject());
Assertions.assertThat(stateWrapper).isNotEmpty();
Assertions.assertThat(stateWrapper.get().getStateType()).isEqualTo(StateType.LEGACY);
}

@Test
public void testGlobal() {
AirbyteStateMessage stateMessage = new AirbyteStateMessage()
.withStateType(AirbyteStateType.GLOBAL)
.withGlobal(
new AirbyteGlobalState()
.withSharedState(Jsons.emptyObject())
.withStreamStates(Lists.newArrayList(
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()),
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject()))));
Optional<StateWrapper> stateWrapper = StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage)));
Assertions.assertThat(stateWrapper).isNotEmpty();
Assertions.assertThat(stateWrapper.get().getStateType()).isEqualTo(StateType.GLOBAL);
Assertions.assertThat(stateWrapper.get().getGlobal()).isEqualTo(stateMessage);
}

@Test
public void testStream() {
AirbyteStateMessage stateMessage1 = new AirbyteStateMessage()
.withStateType(AirbyteStateType.STREAM)
.withStream(
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()));
AirbyteStateMessage stateMessage2 = new AirbyteStateMessage()
.withStateType(AirbyteStateType.STREAM)
.withStream(
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject()));
Optional<StateWrapper> stateWrapper = StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage1, stateMessage2)));
Assertions.assertThat(stateWrapper).isNotEmpty();
Assertions.assertThat(stateWrapper.get().getStateType()).isEqualTo(StateType.STREAM);
Assertions.assertThat(stateWrapper.get().getStateMessages()).containsExactlyInAnyOrder(stateMessage1, stateMessage2);
}

@Test
public void testInvalidMixedState() {
AirbyteStateMessage stateMessage1 = new AirbyteStateMessage()
.withStateType(AirbyteStateType.STREAM)
.withStream(
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()));
AirbyteStateMessage stateMessage2 = new AirbyteStateMessage()
.withStateType(AirbyteStateType.GLOBAL)
.withGlobal(
new AirbyteGlobalState()
.withSharedState(Jsons.emptyObject())
.withStreamStates(Lists.newArrayList(
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()),
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject()))));
Assertions.assertThatThrownBy(() -> StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage1, stateMessage2))))
.isInstanceOf(IllegalStateException.class);
}

@Test
public void testDuplicatedGlobalState() {
AirbyteStateMessage stateMessage1 = new AirbyteStateMessage()
.withStateType(AirbyteStateType.GLOBAL)
.withGlobal(
new AirbyteGlobalState()
.withSharedState(Jsons.emptyObject())
.withStreamStates(Lists.newArrayList(
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()),
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject()))));
AirbyteStateMessage stateMessage2 = new AirbyteStateMessage()
.withStateType(AirbyteStateType.GLOBAL)
.withGlobal(
new AirbyteGlobalState()
.withSharedState(Jsons.emptyObject())
.withStreamStates(Lists.newArrayList(
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()),
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject()))));
Assertions.assertThatThrownBy(() -> StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage1, stateMessage2))))
.isInstanceOf(IllegalStateException.class);
}

@Test
public void testEmptyStateList() {
Assertions.assertThatThrownBy(() -> StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList())))
.isInstanceOf(IllegalStateException.class);
}

}
Loading