diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index a0fbaa91712f0..ca13c26351f00 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -126,7 +126,7 @@ - name: Google PubSub destinationDefinitionId: 356668e2-7e34-47f3-a3b0-67a8a481b692 dockerRepository: airbyte/destination-pubsub - dockerImageTag: 0.1.5 + dockerImageTag: 0.1.6 documentationUrl: https://docs.airbyte.io/integrations/destinations/pubsub icon: googlepubsub.svg releaseStage: alpha diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 59e11741de8f0..fb89aed4be8a4 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -1972,7 +1972,7 @@ supported_destination_sync_modes: - "append" - "overwrite" -- dockerImage: "airbyte/destination-pubsub:0.1.5" +- dockerImage: "airbyte/destination-pubsub:0.1.6" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/pubsub" connectionSpecification: diff --git a/airbyte-integrations/connectors/destination-pubsub/Dockerfile b/airbyte-integrations/connectors/destination-pubsub/Dockerfile index 4bd1e25450c19..283e66285e679 100644 --- a/airbyte-integrations/connectors/destination-pubsub/Dockerfile +++ b/airbyte-integrations/connectors/destination-pubsub/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-pubsub COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.5 +LABEL io.airbyte.version=0.1.6 LABEL io.airbyte.name=airbyte/destination-pubsub diff --git a/airbyte-integrations/connectors/destination-pubsub/build.gradle b/airbyte-integrations/connectors/destination-pubsub/build.gradle index 6f6ecad1a8e20..14dc95279e67e 100644 --- a/airbyte-integrations/connectors/destination-pubsub/build.gradle +++ b/airbyte-integrations/connectors/destination-pubsub/build.gradle @@ -17,6 +17,8 @@ dependencies { implementation project(':airbyte-integrations:bases:base-java') implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) + testImplementation project(':airbyte-integrations:bases:standard-destination-test') + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-pubsub') } diff --git a/airbyte-integrations/connectors/destination-pubsub/src/main/java/io/airbyte/integrations/destination/pubsub/PubsubConsumer.java b/airbyte-integrations/connectors/destination-pubsub/src/main/java/io/airbyte/integrations/destination/pubsub/PubsubConsumer.java index cef941529bf29..38f78ce0a7e39 100644 --- a/airbyte-integrations/connectors/destination-pubsub/src/main/java/io/airbyte/integrations/destination/pubsub/PubsubConsumer.java +++ b/airbyte-integrations/connectors/destination-pubsub/src/main/java/io/airbyte/integrations/destination/pubsub/PubsubConsumer.java @@ -39,7 +39,6 @@ public class PubsubConsumer extends FailureTrackingAirbyteMessageConsumer { private final Consumer outputRecordCollector; private final Map> attributes; private Publisher publisher; - private AirbyteMessage lastStateMessage; public PubsubConsumer(final JsonNode config, final ConfiguredAirbyteCatalog catalog, @@ -47,7 +46,6 @@ public PubsubConsumer(final JsonNode config, this.outputRecordCollector = outputRecordCollector; this.config = config; this.catalog = catalog; - this.lastStateMessage = null; this.attributes = Maps.newHashMap(); this.publisher = null; LOGGER.info("initializing consumer."); @@ -82,8 +80,7 @@ protected void startTracked() throws Exception { @Override protected void acceptTracked(final AirbyteMessage msg) throws Exception { if (msg.getType() == Type.STATE) { - lastStateMessage = msg; - outputRecordCollector.accept(lastStateMessage); + outputRecordCollector.accept(msg); return; } else if (msg.getType() != Type.RECORD) { return; @@ -114,7 +111,6 @@ protected void close(final boolean hasFailed) throws Exception { if (!hasFailed) { publisher.shutdown(); LOGGER.info("shutting down consumer."); - outputRecordCollector.accept(lastStateMessage); } } diff --git a/airbyte-integrations/connectors/destination-pubsub/src/test/java/io/airbyte/integration/destination/pubsub/PubsubConsumerTest.java b/airbyte-integrations/connectors/destination-pubsub/src/test/java/io/airbyte/integration/destination/pubsub/PubsubConsumerTest.java new file mode 100644 index 0000000000000..34ef00718d023 --- /dev/null +++ b/airbyte-integrations/connectors/destination-pubsub/src/test/java/io/airbyte/integration/destination/pubsub/PubsubConsumerTest.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integration.destination.pubsub; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; +import io.airbyte.integrations.destination.pubsub.PubsubConsumer; +import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.function.Consumer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class PubsubConsumerTest extends PerStreamStateMessageTest { + + @Mock + private Consumer outputRecordCollector; + + private PubsubConsumer consumer; + + @Mock + private JsonNode config; + @Mock + private ConfiguredAirbyteCatalog catalog; + + @BeforeEach + public void init() { + consumer = new PubsubConsumer(config, catalog, outputRecordCollector); + } + + @Override + protected Consumer getMockedConsumer() { + return outputRecordCollector; + } + + @Override + protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() { + return consumer; + } + +}