Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

15308 Destination PubSub: Handle per-stream state #15355

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-pubsub

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.name=airbyte/destination-pubsub
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ dependencies {
implementation project(':airbyte-integrations:bases:base-java')
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)

testImplementation project(':airbyte-integrations:bases:standard-destination-test')

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-pubsub')
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,13 @@ public class PubsubConsumer extends FailureTrackingAirbyteMessageConsumer {
private final Consumer<AirbyteMessage> outputRecordCollector;
private final Map<AirbyteStreamNameNamespacePair, Map<String, String>> attributes;
private Publisher publisher;
private AirbyteMessage lastStateMessage;

public PubsubConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
this.outputRecordCollector = outputRecordCollector;
this.config = config;
this.catalog = catalog;
this.lastStateMessage = null;
this.attributes = Maps.newHashMap();
this.publisher = null;
LOGGER.info("initializing consumer.");
Expand Down Expand Up @@ -82,8 +80,7 @@ protected void startTracked() throws Exception {
@Override
protected void acceptTracked(final AirbyteMessage msg) throws Exception {
if (msg.getType() == Type.STATE) {
lastStateMessage = msg;
outputRecordCollector.accept(lastStateMessage);
outputRecordCollector.accept(msg);
return;
} else if (msg.getType() != Type.RECORD) {
return;
Expand Down Expand Up @@ -114,7 +111,6 @@ protected void close(final boolean hasFailed) throws Exception {
if (!hasFailed) {
publisher.shutdown();
LOGGER.info("shutting down consumer.");
outputRecordCollector.accept(lastStateMessage);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integration.destination.pubsub;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.destination.pubsub.PubsubConsumer;
import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.function.Consumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
public class PubsubConsumerTest extends PerStreamStateMessageTest {

@Mock
private Consumer<AirbyteMessage> outputRecordCollector;

private PubsubConsumer consumer;

@Mock
private JsonNode config;
@Mock
private ConfiguredAirbyteCatalog catalog;

@BeforeEach
public void init() {
consumer = new PubsubConsumer(config, catalog, outputRecordCollector);
}

@Override
protected Consumer<AirbyteMessage> getMockedConsumer() {
return outputRecordCollector;
}

@Override
protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() {
return consumer;
}

}