Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

15310: Destination Scylla: Handle per-stream state #15399

Merged
merged 16 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@
- name: Scylla
destinationDefinitionId: 3dc6f384-cd6b-4be3-ad16-a41450899bf0
dockerRepository: airbyte/destination-scylla
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/destinations/scylla
icon: scylla.svg
releaseStage: alpha
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5040,7 +5040,7 @@
supported_destination_sync_modes:
- "append"
- "append_dedup"
- dockerImage: "airbyte/destination-scylla:0.1.2"
- dockerImage: "airbyte/destination-scylla:0.1.3"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/scylla"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-scylla

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/destination-scylla
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {

implementation "com.scylladb:scylla-driver-core:${scyllaDriver}"

testImplementation project(':airbyte-integrations:bases:standard-destination-test')
// https://mvnrepository.com/artifact/org.assertj/assertj-core
testImplementation "org.assertj:assertj-core:${assertVersion}"
// https://mvnrepository.com/artifact/org.testcontainers/testcontainers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ public class ScyllaMessageConsumer extends FailureTrackingAirbyteMessageConsumer

private final ScyllaCqlProvider scyllaCqlProvider;

private AirbyteMessage lastMessage = null;

public ScyllaMessageConsumer(ScyllaConfig scyllaConfig,
ConfiguredAirbyteCatalog configuredCatalog,
Consumer<AirbyteMessage> outputRecordCollector) {
Expand Down Expand Up @@ -66,7 +64,7 @@ protected void acceptTracked(AirbyteMessage message) {
var data = Jsons.serialize(messageRecord.getData());
scyllaCqlProvider.insert(streamConfig.getKeyspace(), streamConfig.getTempTableName(), data);
} else if (message.getType() == AirbyteMessage.Type.STATE) {
this.lastMessage = message;
outputRecordCollector.accept(message);
} else {
LOGGER.warn("Unsupported airbyte message type: {}", message.getType());
}
Expand All @@ -92,7 +90,6 @@ protected void close(boolean hasFailed) {
LOGGER.error("Error while copying data to table {}: ", v.getTableName(), e);
}
});
outputRecordCollector.accept(lastMessage);
}

scyllaStreams.forEach((k, v) -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.airbyte.integrations.destination.scylla;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest;
import io.airbyte.integrations.util.HostPortResolver;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.function.Consumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.testcontainers.containers.GenericContainer;

@DisplayName("ScyllaRecordConsumer")
@ExtendWith(MockitoExtension.class)
public class ScyllaRecordConsumerTest extends PerStreamStateMessageTest {
private static ScyllaContainer scyllaContainer;

@Mock
private Consumer<AirbyteMessage> outputRecordCollector;

private ScyllaMessageConsumer consumer;

@Mock
ScyllaConfig scyllaConfig;

@Mock
private ConfiguredAirbyteCatalog configuredCatalog;

public static ScyllaContainer initContainer() {
if (scyllaContainer == null) {
scyllaContainer = new ScyllaContainer()
.withExposedPorts(9042)
// single cpu core cluster
.withCommand("--smp 1");
}
scyllaContainer.start();
return scyllaContainer;
}

@BeforeEach
public void init() {
ScyllaContainer scyllaContainer = initContainer();
JsonNode configJson = TestDataFactory.jsonConfig(
HostPortResolver.resolveHost(scyllaContainer),
HostPortResolver.resolvePort(scyllaContainer));
var scyllaConfig = new ScyllaConfig(configJson);
consumer = new ScyllaMessageConsumer(scyllaConfig, configuredCatalog, outputRecordCollector);
}

@Override
protected Consumer<AirbyteMessage> getMockedConsumer() {
return outputRecordCollector;
}

@Override
protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() {
return consumer;
}

static class ScyllaContainer extends GenericContainer<ScyllaContainer> {

public ScyllaContainer() {
super("scylladb/scylla:4.5.0");
}

}
}
5 changes: 4 additions & 1 deletion docs/integrations/destinations/scylla.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,8 @@ and handle any amount of data from the connector.
* Replication [optional] [default: 1]

### Setup guide
## Changelog

###### TODO: more info, screenshots?, etc...
| Version | Date | Pull Request | Subject |
|:--------|:-----------| :--- |:----------------------------------------------------------------------------------------------------|
| 0.1.3 | 2022-08-10 | [153999](https://github.com/airbytehq/airbyte/pull/15399) | handling per-stream state |