diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index bf32bdd1e401f..b98d741164fdf 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -40,7 +40,6 @@ import io.airbyte.protocol.models.AirbyteStreamState; import io.airbyte.protocol.models.CommonField; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.SyncMode; import java.sql.Connection; import java.sql.JDBCType; @@ -50,7 +49,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/airbyte-integrations/connectors/source-relational-db/build.gradle b/airbyte-integrations/connectors/source-relational-db/build.gradle index 83e6ec9268641..58cc47dfd17ab 100644 --- a/airbyte-integrations/connectors/source-relational-db/build.gradle +++ b/airbyte-integrations/connectors/source-relational-db/build.gradle @@ -11,6 +11,7 @@ dependencies { implementation project(':airbyte-integrations:bases:base-java') implementation project(':airbyte-protocol:protocol-models') implementation project(':airbyte-json-validation') + implementation project(':airbyte-config:config-models') implementation 'org.apache.commons:commons-lang3:3.11' diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index d30a8374f4bb3..98c0d9e436759 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -13,6 +13,8 @@ import io.airbyte.commons.type.Types; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; +import io.airbyte.config.StateWrapper; +import io.airbyte.config.helpers.StateMessageHelper; import io.airbyte.db.AbstractDatabase; import io.airbyte.db.IncrementalUtils; import io.airbyte.db.jdbc.JdbcDatabase; @@ -20,7 +22,6 @@ import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.relationaldb.models.DbState; -import io.airbyte.integrations.source.relationaldb.state.AirbyteStateMessageListTypeReference; import io.airbyte.integrations.source.relationaldb.state.StateManager; import io.airbyte.integrations.source.relationaldb.state.StateManagerFactory; import io.airbyte.protocol.models.AirbyteCatalog; @@ -521,16 +522,18 @@ private Database createDatabaseInternal(final JsonNode sourceConfig) throws Exce * @return The deserialized object representation of the state. */ protected List deserializeInitialState(final JsonNode initialStateJson, final JsonNode config) { - if (initialStateJson == null) { - return generateEmptyInitialState(config); - } else { - try { - return Jsons.object(initialStateJson, new AirbyteStateMessageListTypeReference()); - } catch (final IllegalArgumentException e) { - LOGGER.warn("Defaulting to legacy state object..."); - return List.of(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY).withData(initialStateJson)); + final Optional typedState = StateMessageHelper.getTypedState(initialStateJson); + return typedState.map((state) -> { + switch (state.getStateType()) { + case GLOBAL: + return List.of(state.getGlobal()); + case STREAM: + return state.getStateMessages(); + case LEGACY: + default: + return List.of(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY).withData(state.getLegacyState())); } - } + }).orElse(generateEmptyInitialState(config)); } /** diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/AirbyteStateMessageListTypeReference.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/AirbyteStateMessageListTypeReference.java deleted file mode 100644 index c7e153e6d79a0..0000000000000 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/AirbyteStateMessageListTypeReference.java +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.relationaldb.state; - -import com.fasterxml.jackson.core.type.TypeReference; -import io.airbyte.protocol.models.AirbyteStateMessage; -import java.util.List; - -public class AirbyteStateMessageListTypeReference extends TypeReference> { - -} diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/CursorManager.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/CursorManager.java index 207b51ad5bad1..2fabade977264 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/CursorManager.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/CursorManager.java @@ -92,7 +92,7 @@ protected Map createCursorInfoMap( final Map localMap = new HashMap<>(); final Map pairToState = streamSupplier.get() .stream() - .collect(Collectors.toMap(namespacePairFunction,Function.identity())); + .collect(Collectors.toMap(namespacePairFunction, Function.identity())); final Map pairToConfiguredAirbyteStream = catalog.getStreams().stream() .collect(Collectors.toMap(AirbyteStreamNameNamespacePair::fromConfiguredAirbyteSteam, Function.identity())); diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManager.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManager.java index 934cecb75f95a..41ae2a2e47b2a 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManager.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManager.java @@ -94,7 +94,8 @@ private CdcState extractCdcState(final AirbyteStateMessage airbyteStateMessage) if (airbyteStateMessage.getType() == AirbyteStateType.GLOBAL) { return Jsons.object(airbyteStateMessage.getGlobal().getSharedState(), CdcState.class); } else { - return Jsons.object(airbyteStateMessage.getData(), DbState.class).getCdcState(); + final DbState legacyState = Jsons.object(airbyteStateMessage.getData(), DbState.class); + return legacyState != null ? legacyState.getCdcState() : null; } } diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java index dbf0c4b0e8ef5..40fa957c71b53 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java @@ -148,7 +148,8 @@ public static Optional extractState(final AirbyteStreamState stat * Tests whether the provided {@link StreamDescriptor} is valid. A valid descriptor is defined as * one that has a non-{@code null} name. * - * See https://github.com/airbytehq/airbyte/blob/e63458fabb067978beb5eaa74d2bc130919b419f/docs/understanding-airbyte/airbyte-protocol.md + * See + * https://github.com/airbytehq/airbyte/blob/e63458fabb067978beb5eaa74d2bc130919b419f/docs/understanding-airbyte/airbyte-protocol.md * for more details * * @param streamDescriptor A {@link StreamDescriptor} to be validated. @@ -183,20 +184,6 @@ public static AirbyteStateMessage convertLegacyStateToGlobalState(final AirbyteS return new AirbyteStateMessage().withType(AirbyteStateType.GLOBAL).withGlobal(globalState); } - /** - * Converts a {@link AirbyteStateType#GLOBAL} state message into a list of - * {@link AirbyteStateType#STREAM} messages. - * - * @param airbyteStateMessage A {@link AirbyteStateType#GLOBAL} state message. - * @return A list {@link AirbyteStateType#STREAM} state messages. - */ - public static List convertGlobalStateToStreamState(final AirbyteStateMessage airbyteStateMessage) { - return airbyteStateMessage.getGlobal().getStreamStates().stream() - .map(s -> new AirbyteStateMessage().withType(AirbyteStateType.STREAM) - .withStream(new AirbyteStreamState().withStreamDescriptor(s.getStreamDescriptor()).withStreamState(s.getStreamState()))) - .collect(Collectors.toList()); - } - /** * Converts a {@link AirbyteStateType#LEGACY} state message into a list of * {@link AirbyteStateType#STREAM} messages. diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/AbstractDbSourceTest.java b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/AbstractDbSourceTest.java new file mode 100644 index 0000000000000..b9a47e3ba68c1 --- /dev/null +++ b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/AbstractDbSourceTest.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.relationaldb; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; +import java.io.IOException; +import java.util.List; +import org.junit.jupiter.api.Test; + +/** + * Test suite for the {@link AbstractDbSource} class. + */ +public class AbstractDbSourceTest { + + @Test + void testDeserializationOfLegacyState() throws IOException { + final AbstractDbSource dbSource = spy(AbstractDbSource.class); + final JsonNode config = mock(JsonNode.class); + + final String legacyStateJson = MoreResources.readResource("states/legacy.json"); + final JsonNode legacyState = Jsons.deserialize(legacyStateJson); + + final List result = dbSource.deserializeInitialState(legacyState, config); + assertEquals(1, result.size()); + assertEquals(AirbyteStateType.LEGACY, result.get(0).getType()); + } + + @Test + void testDeserializationOfGlobalState() throws IOException { + final AbstractDbSource dbSource = spy(AbstractDbSource.class); + final JsonNode config = mock(JsonNode.class); + + final String globalStateJson = MoreResources.readResource("states/global.json"); + final JsonNode globalState = Jsons.deserialize(globalStateJson); + + final List result = dbSource.deserializeInitialState(globalState, config); + assertEquals(1, result.size()); + assertEquals(AirbyteStateType.GLOBAL, result.get(0).getType()); + } + + @Test + void testDeserializationOfStreamState() throws IOException { + final AbstractDbSource dbSource = spy(AbstractDbSource.class); + final JsonNode config = mock(JsonNode.class); + + final String streamStateJson = MoreResources.readResource("states/per_stream.json"); + final JsonNode streamState = Jsons.deserialize(streamStateJson); + + final List result = dbSource.deserializeInitialState(streamState, config); + assertEquals(2, result.size()); + assertEquals(AirbyteStateType.STREAM, result.get(0).getType()); + } + + @Test + void testDeserializationOfNullState() throws IOException { + final AbstractDbSource dbSource = spy(AbstractDbSource.class); + final JsonNode config = mock(JsonNode.class); + + final List result = dbSource.deserializeInitialState(null, config); + assertEquals(1, result.size()); + assertEquals(dbSource.getSupportedStateType(config), result.get(0).getType()); + } + +} diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManagerTest.java b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManagerTest.java index 282556984dea4..0a80b79c6f58b 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManagerTest.java +++ b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManagerTest.java @@ -30,6 +30,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import org.junit.jupiter.api.Test; @@ -202,4 +203,16 @@ void testToState() { assertEquals(expected, actualFirstEmission); } + @Test + void testToStateWithNoState() { + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog(); + final StateManager stateManager = + new GlobalStateManager(new AirbyteStateMessage(), catalog); + + final AirbyteStateMessage airbyteStateMessage = stateManager.toState(Optional.empty()); + assertNotNull(airbyteStateMessage); + assertEquals(AirbyteStateType.GLOBAL, airbyteStateMessage.getType()); + assertEquals(0, airbyteStateMessage.getGlobal().getStreamStates().size()); + } + } diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/resources/states/global.json b/airbyte-integrations/connectors/source-relational-db/src/test/resources/states/global.json new file mode 100644 index 0000000000000..5b1c5189b5fe7 --- /dev/null +++ b/airbyte-integrations/connectors/source-relational-db/src/test/resources/states/global.json @@ -0,0 +1,49 @@ +[ + { + "type": "GLOBAL", + "global": { + "shared_state": { + "state": { + "foo": "bar", + "baz": 5 + } + }, + "stream_states": [ + { + "stream_descriptor": { + "name": "bicycles", + "namespace": "public" + }, + "stream_state": { + "stream_name": "bicycles", + "stream_namespace": "public", + "cursor_field": ["generation"] + } + }, + { + "stream_descriptor": { + "name": "cars", + "namespace": "public" + }, + "stream_state": { + "stream_name": "cars", + "stream_namespace": "public", + "cursor_field": ["year"], + "cursor": "a" + } + }, + { + "stream_descriptor": { + "name": "stationary_bicycles", + "namespace": "public" + }, + "stream_state": { + "stream_name": "stationary_bicycles", + "stream_namespace": "public", + "cursor_field": [] + } + } + ] + } + } +] diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/resources/states/legacy.json b/airbyte-integrations/connectors/source-relational-db/src/test/resources/states/legacy.json new file mode 100644 index 0000000000000..e20bdc5530872 --- /dev/null +++ b/airbyte-integrations/connectors/source-relational-db/src/test/resources/states/legacy.json @@ -0,0 +1,17 @@ +{ + "cdc": false, + "streams": [ + { + "cursor": "4", + "stream_name": "cars", + "cursor_field": ["id"], + "stream_namespace": "public" + }, + { + "cursor": "1", + "stream_name": "us_states", + "cursor_field": ["id"], + "stream_namespace": "public" + } + ] +} diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/resources/states/per_stream.json b/airbyte-integrations/connectors/source-relational-db/src/test/resources/states/per_stream.json new file mode 100644 index 0000000000000..9644b13ed1569 --- /dev/null +++ b/airbyte-integrations/connectors/source-relational-db/src/test/resources/states/per_stream.json @@ -0,0 +1,32 @@ +[ + { + "type": "STREAM", + "stream": { + "stream_descriptor": { + "name": "id_and_name", + "namespace": "public" + }, + "stream_state": { + "stream_name": "id_and_name", + "stream_namespace": "public", + "cursor_field": ["id"], + "cursor": "5" + } + } + }, + { + "type": "STREAM", + "stream": { + "stream_descriptor": { + "name": "other", + "namespace": "public" + }, + "stream_state": { + "stream_name": "other", + "stream_namespace": "public", + "cursor_field": ["id"], + "cursor": "2" + } + } + } +]