From 7b5eb44ef0b693c1eb9f5ad31b9bd08228b3eb6b Mon Sep 17 00:00:00 2001 From: kimerinn Date: Mon, 8 Aug 2022 12:39:37 +0300 Subject: [PATCH 1/5] 15310: Destination Scylla: Handle per-stream state --- .../destination-scylla/build.gradle | 1 + .../scylla/ScyllaMessageConsumer.java | 5 +-- .../scylla/ScyllaRecordConsumerTest.java | 42 +++++++++++++++++++ 3 files changed, 44 insertions(+), 4 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/ScyllaRecordConsumerTest.java diff --git a/airbyte-integrations/connectors/destination-scylla/build.gradle b/airbyte-integrations/connectors/destination-scylla/build.gradle index 9fcc858fe8114..21d45bb766fbb 100644 --- a/airbyte-integrations/connectors/destination-scylla/build.gradle +++ b/airbyte-integrations/connectors/destination-scylla/build.gradle @@ -20,6 +20,7 @@ dependencies { implementation "com.scylladb:scylla-driver-core:${scyllaDriver}" + testImplementation project(':airbyte-integrations:bases:standard-destination-test') // https://mvnrepository.com/artifact/org.assertj/assertj-core testImplementation "org.assertj:assertj-core:${assertVersion}" // https://mvnrepository.com/artifact/org.testcontainers/testcontainers diff --git a/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaMessageConsumer.java b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaMessageConsumer.java index 8cfbc7d0e94f7..a03053e8a3dd0 100644 --- a/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaMessageConsumer.java +++ b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaMessageConsumer.java @@ -27,8 +27,6 @@ public class ScyllaMessageConsumer extends FailureTrackingAirbyteMessageConsumer private final ScyllaCqlProvider scyllaCqlProvider; - private AirbyteMessage lastMessage = null; - public ScyllaMessageConsumer(ScyllaConfig scyllaConfig, ConfiguredAirbyteCatalog configuredCatalog, Consumer outputRecordCollector) { @@ -66,7 +64,7 @@ protected void acceptTracked(AirbyteMessage message) { var data = Jsons.serialize(messageRecord.getData()); scyllaCqlProvider.insert(streamConfig.getKeyspace(), streamConfig.getTempTableName(), data); } else if (message.getType() == AirbyteMessage.Type.STATE) { - this.lastMessage = message; + outputRecordCollector.accept(message); } else { LOGGER.warn("Unsupported airbyte message type: {}", message.getType()); } @@ -92,7 +90,6 @@ protected void close(boolean hasFailed) { LOGGER.error("Error while copying data to table {}: ", v.getTableName(), e); } }); - outputRecordCollector.accept(lastMessage); } scyllaStreams.forEach((k, v) -> { diff --git a/airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/ScyllaRecordConsumerTest.java b/airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/ScyllaRecordConsumerTest.java new file mode 100644 index 0000000000000..f5f6214209182 --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/ScyllaRecordConsumerTest.java @@ -0,0 +1,42 @@ +package io.airbyte.integrations.destination.scylla; + + +import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.function.Consumer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension + +@DisplayName("ScyllaRecordConsumer") +@ExtendWith(MockitoExtension.class) +public class ScyllaRecordConsumerTest extends PerStreamStateMessageTest { + @Mock + private Consumer outputRecordCollector; + + private ScyllaMessageConsumer consumer; + + @Mock + ScyllaConfig scyllaConfig; + + @Mock + private ConfiguredAirbyteCatalog configuredCatalog; + + @BeforeEach + public void init() { + consumer = new ScyllaMessageConsumer(scyllaConfig, configuredCatalog, outputRecordCollector); + } + + @Override + protected Consumer getMockedConsumer() { + return outputRecordCollector; + } + + @Override + protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() { + return consumer; + } +} From 35a7347816369a564096462b3452ad1ac5ab4f23 Mon Sep 17 00:00:00 2001 From: kimerinn Date: Mon, 8 Aug 2022 14:58:24 +0300 Subject: [PATCH 2/5] 15399: test fix --- .../destination/scylla/ScyllaRecordConsumerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/ScyllaRecordConsumerTest.java b/airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/ScyllaRecordConsumerTest.java index f5f6214209182..1454413326359 100644 --- a/airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/ScyllaRecordConsumerTest.java +++ b/airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/ScyllaRecordConsumerTest.java @@ -1,15 +1,15 @@ package io.airbyte.integrations.destination.scylla; - import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest; import java.util.function.Consumer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.junit.jupiter.MockitoExtension; @DisplayName("ScyllaRecordConsumer") @ExtendWith(MockitoExtension.class) From 8fc64a98bab49d7f2102b6cb9c1297fc979a46e1 Mon Sep 17 00:00:00 2001 From: kimerinn Date: Tue, 9 Aug 2022 18:15:08 +0300 Subject: [PATCH 3/5] 15318: test fix --- .../scylla/ScyllaRecordConsumerTest.java | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/ScyllaRecordConsumerTest.java b/airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/ScyllaRecordConsumerTest.java index 1454413326359..0686797bce14c 100644 --- a/airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/ScyllaRecordConsumerTest.java +++ b/airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/ScyllaRecordConsumerTest.java @@ -1,19 +1,24 @@ package io.airbyte.integrations.destination.scylla; +import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; +import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest; +import io.airbyte.integrations.util.HostPortResolver; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest; import java.util.function.Consumer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.testcontainers.containers.GenericContainer; @DisplayName("ScyllaRecordConsumer") @ExtendWith(MockitoExtension.class) public class ScyllaRecordConsumerTest extends PerStreamStateMessageTest { + private static ScyllaContainer scyllaContainer; + @Mock private Consumer outputRecordCollector; @@ -25,8 +30,24 @@ public class ScyllaRecordConsumerTest extends PerStreamStateMessageTest { @Mock private ConfiguredAirbyteCatalog configuredCatalog; + public static ScyllaContainer initContainer() { + if (scyllaContainer == null) { + scyllaContainer = new ScyllaContainer() + .withExposedPorts(9042) + // single cpu core cluster + .withCommand("--smp 1"); + } + scyllaContainer.start(); + return scyllaContainer; + } + @BeforeEach public void init() { + ScyllaContainer scyllaContainer = initContainer(); + JsonNode configJson = TestDataFactory.jsonConfig( + HostPortResolver.resolveHost(scyllaContainer), + HostPortResolver.resolvePort(scyllaContainer)); + var scyllaConfig = new ScyllaConfig(configJson); consumer = new ScyllaMessageConsumer(scyllaConfig, configuredCatalog, outputRecordCollector); } @@ -39,4 +60,12 @@ protected Consumer getMockedConsumer() { protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() { return consumer; } + + static class ScyllaContainer extends GenericContainer { + + public ScyllaContainer() { + super("scylladb/scylla:4.5.0"); + } + + } } From bea820262ea01b45bbaa9ce2e0b495a104a1eb57 Mon Sep 17 00:00:00 2001 From: kimerinn Date: Tue, 9 Aug 2022 23:52:06 +0300 Subject: [PATCH 4/5] 15318: updating version --- .../connectors/destination-scylla/Dockerfile | 2 +- docs/integrations/destinations/scylla.md | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-scylla/Dockerfile b/airbyte-integrations/connectors/destination-scylla/Dockerfile index f7e349e0c1b10..62da4ca4a3683 100644 --- a/airbyte-integrations/connectors/destination-scylla/Dockerfile +++ b/airbyte-integrations/connectors/destination-scylla/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-scylla COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.2 +LABEL io.airbyte.version=0.1.3 LABEL io.airbyte.name=airbyte/destination-scylla diff --git a/docs/integrations/destinations/scylla.md b/docs/integrations/destinations/scylla.md index 14ae8435ec0b4..386a1fd418aae 100644 --- a/docs/integrations/destinations/scylla.md +++ b/docs/integrations/destinations/scylla.md @@ -42,5 +42,8 @@ and handle any amount of data from the connector. * Replication [optional] [default: 1] ### Setup guide +## Changelog -###### TODO: more info, screenshots?, etc... \ No newline at end of file +| Version | Date | Pull Request | Subject | +|:--------|:-----------| :--- |:----------------------------------------------------------------------------------------------------| +| 0.1.3 | 2022-08-10 | [153999](https://github.com/airbytehq/airbyte/pull/15399) | handling per-stream state | From 3fac0a8146b09a26b04b3feaeaeda17289ad5375 Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Tue, 9 Aug 2022 23:42:47 +0000 Subject: [PATCH 5/5] auto-bump connector version [ci skip] --- .../init/src/main/resources/seed/destination_definitions.yaml | 2 +- .../init/src/main/resources/seed/destination_specs.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index f9012e4e1cf54..63f6fe788372f 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -297,7 +297,7 @@ - name: Scylla destinationDefinitionId: 3dc6f384-cd6b-4be3-ad16-a41450899bf0 dockerRepository: airbyte/destination-scylla - dockerImageTag: 0.1.2 + dockerImageTag: 0.1.3 documentationUrl: https://docs.airbyte.io/integrations/destinations/scylla icon: scylla.svg releaseStage: alpha diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index dfcfb8d225990..2ddd01880f89d 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -5040,7 +5040,7 @@ supported_destination_sync_modes: - "append" - "append_dedup" -- dockerImage: "airbyte/destination-scylla:0.1.2" +- dockerImage: "airbyte/destination-scylla:0.1.3" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/scylla" connectionSpecification: