From b25716e056e533c5df44136470f298d03bb6a7b8 Mon Sep 17 00:00:00 2001 From: subodhchaturvedi Date: Wed, 14 Feb 2024 01:36:39 +0530 Subject: [PATCH 1/7] destination-async-framework: move the state emission logic into GlobalAsyncStateManager --- .../destination_async/FlushWorkers.java | 20 +- .../buffers/BufferManager.java | 11 +- .../buffers/MemoryAwareMessageBatch.java | 5 +- .../state/GlobalAsyncStateManager.java | 24 +- .../AsyncStreamConsumerTest.java | 4 +- .../buffers/BufferDequeueTest.java | 16 +- .../state/GlobalAsyncStateManagerTest.java | 396 +++++++++++++----- .../jdbc/JdbcBufferedConsumerFactory.java | 2 +- .../staging/StagingConsumerFactory.java | 2 +- 9 files changed, 322 insertions(+), 158 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/FlushWorkers.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/FlushWorkers.java index 4b3d9f489519a..a4db507bb8c67 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/FlushWorkers.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/FlushWorkers.java @@ -8,11 +8,8 @@ import io.airbyte.cdk.integrations.destination_async.buffers.StreamAwareQueue.MessageWithMeta; import io.airbyte.cdk.integrations.destination_async.state.FlushFailure; import io.airbyte.cdk.integrations.destination_async.state.GlobalAsyncStateManager; -import io.airbyte.cdk.integrations.destination_async.state.PartialStateWithDestinationStats; -import io.airbyte.commons.json.Jsons; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.StreamDescriptor; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -67,8 +64,6 @@ public class FlushWorkers implements AutoCloseable { private final AtomicBoolean isClosing; private final GlobalAsyncStateManager stateManager; - private final Object LOCK = new Object(); - public FlushWorkers(final BufferDequeue bufferDequeue, final DestinationFlushFunction flushFunction, final Consumer outputRecordCollector, @@ -172,7 +167,7 @@ private void flush(final StreamDescriptor desc, final UUID flushWorkerId) { AirbyteFileUtils.byteCountToDisplaySize(batch.getSizeInBytes())); flusher.flush(desc, batch.getData().stream().map(MessageWithMeta::message)); - emitStateMessages(batch.flushStates(stateIdToCount)); + batch.flushStates(stateIdToCount); } log.info("Flush Worker ({}) -- Worker finished flushing. Current queue size: {}", @@ -222,7 +217,7 @@ public void close() throws Exception { log.info("Closing flush workers -- all buffers flushed"); // before shutting down the supervisor, flush all state. - emitStateMessages(stateManager.flushStates()); + stateManager.flushStates(); supervisorThread.shutdown(); while (!supervisorThread.awaitTermination(5L, TimeUnit.MINUTES)) { log.info("Waiting for flush worker supervisor to shut down"); @@ -239,17 +234,6 @@ public void close() throws Exception { debugLoop.shutdownNow(); } - private void emitStateMessages(final List partials) { - synchronized (LOCK) { - for (final PartialStateWithDestinationStats partial : partials) { - final AirbyteMessage message = Jsons.deserialize(partial.stateMessage().getSerialized(), AirbyteMessage.class); - message.getState().setDestinationStats(partial.stats()); - log.info("State with arrival number {} emitted from thread {}", partial.stateArrivalNumber(), Thread.currentThread().getName()); - outputRecordCollector.accept(message); - } - } - } - private static String humanReadableFlushWorkerId(final UUID flushWorkerId) { return flushWorkerId.toString().substring(0, 5); } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferManager.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferManager.java index 1d824a2b14c04..5834277839ffa 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferManager.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferManager.java @@ -9,6 +9,7 @@ import io.airbyte.cdk.integrations.destination_async.FlushWorkers; import io.airbyte.cdk.integrations.destination_async.GlobalMemoryManager; import io.airbyte.cdk.integrations.destination_async.state.GlobalAsyncStateManager; +import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.StreamDescriptor; import java.util.ArrayList; import java.util.concurrent.ConcurrentHashMap; @@ -16,6 +17,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; @@ -38,8 +40,8 @@ public class BufferManager { public static final double MEMORY_LIMIT_RATIO = 0.7; - public BufferManager() { - this((long) (Runtime.getRuntime().maxMemory() * MEMORY_LIMIT_RATIO)); + public BufferManager(final Consumer outputRecordCollector) { + this((long) (Runtime.getRuntime().maxMemory() * MEMORY_LIMIT_RATIO), outputRecordCollector); } /** @@ -47,12 +49,13 @@ public BufferManager() { * GlobalMemoryManager will apply back pressure once this quota is filled. "Memory" can be * released back once flushing finishes. This number should be large enough we don't block * reading unnecessarily, but small enough we apply back pressure before OOMing. + * @param outputRecordCollector */ - public BufferManager(final long memoryLimit) { + public BufferManager(final long memoryLimit, final Consumer outputRecordCollector) { maxMemory = memoryLimit; LOGGER.info("Max 'memory' available for buffer allocation {}", FileUtils.byteCountToDisplaySize(maxMemory)); memoryManager = new GlobalMemoryManager(maxMemory); - this.stateManager = new GlobalAsyncStateManager(memoryManager); + this.stateManager = new GlobalAsyncStateManager(memoryManager, outputRecordCollector); buffers = new ConcurrentHashMap<>(); bufferEnqueue = new BufferEnqueue(memoryManager, buffers, stateManager); bufferDequeue = new BufferDequeue(memoryManager, buffers, stateManager); diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/MemoryAwareMessageBatch.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/MemoryAwareMessageBatch.java index 591837196c1ae..7710e3fc21620 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/MemoryAwareMessageBatch.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/MemoryAwareMessageBatch.java @@ -7,7 +7,6 @@ import io.airbyte.cdk.integrations.destination_async.GlobalMemoryManager; import io.airbyte.cdk.integrations.destination_async.buffers.StreamAwareQueue.MessageWithMeta; import io.airbyte.cdk.integrations.destination_async.state.GlobalAsyncStateManager; -import io.airbyte.cdk.integrations.destination_async.state.PartialStateWithDestinationStats; import java.util.List; import java.util.Map; import org.slf4j.Logger; @@ -64,9 +63,9 @@ public void close() throws Exception { * * @return list of states that can be flushed */ - public List flushStates(final Map stateIdToCount) { + public void flushStates(final Map stateIdToCount) { stateIdToCount.forEach(stateManager::decrement); - return stateManager.flushStates(); + stateManager.flushStates(); } } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManager.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManager.java index fe4ea4a82c167..f89fc4eb23441 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManager.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManager.java @@ -10,13 +10,13 @@ import com.google.common.base.Strings; import io.airbyte.cdk.integrations.destination_async.GlobalMemoryManager; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; +import io.airbyte.commons.json.Jsons; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteStateMessage; import io.airbyte.protocol.models.v0.AirbyteStateStats; import io.airbyte.protocol.models.v0.StreamDescriptor; -import java.util.ArrayList; +import java.time.Instant; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -95,11 +96,13 @@ public class GlobalAsyncStateManager { private long retroactiveGlobalStateId = 0; // All access to this field MUST be guarded by a synchronized(lock) block private long arrivalNumber = 0; + private final Consumer outputRecordCollector; - private final Object LOCK = new Object(); + private static final Object LOCK = new Object(); - public GlobalAsyncStateManager(final GlobalMemoryManager memoryManager) { + public GlobalAsyncStateManager(final GlobalMemoryManager memoryManager, final Consumer outputRecordCollector) { this.memoryManager = memoryManager; + this.outputRecordCollector = outputRecordCollector; this.memoryAllocated = new AtomicLong(memoryManager.requestMemory()); this.memoryUsed = new AtomicLong(); } @@ -161,8 +164,7 @@ public void decrement(final long stateId, final long count) { * * @return list of state messages with no more inflight records. */ - public List flushStates() { - final List output = new ArrayList<>(); + public void flushStates() { Long bytesFlushed = 0L; synchronized (LOCK) { for (final Map.Entry> entry : descToStateIdQ.entrySet()) { @@ -195,8 +197,13 @@ public List flushStates() { if (allRecordsCommitted) { final StateMessageWithArrivalNumber stateMessage = oldestState.getLeft(); final double flushedRecordsAssociatedWithState = stateIdToCounterForPopulatingDestinationStats.get(oldestStateId).doubleValue(); - output.add(new PartialStateWithDestinationStats(stateMessage.partialAirbyteStateMessage(), - new AirbyteStateStats().withRecordCount(flushedRecordsAssociatedWithState), stateMessage.arrivalNumber())); + + log.info("State with arrival number {} emitted from thread {} at {}", stateMessage.arrivalNumber(), Thread.currentThread().getName(), + Instant.now().toString()); + final AirbyteMessage message = Jsons.deserialize(stateMessage.partialAirbyteStateMessage.getSerialized(), AirbyteMessage.class); + message.getState().setDestinationStats(new AirbyteStateStats().withRecordCount(flushedRecordsAssociatedWithState)); + outputRecordCollector.accept(message); + bytesFlushed += oldestState.getRight(); // cleanup @@ -212,7 +219,6 @@ public List flushStates() { } freeBytes(bytesFlushed); - return output; } private Long getStateIdAndIncrement(final StreamDescriptor streamDescriptor, final long increment) { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/AsyncStreamConsumerTest.java b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/AsyncStreamConsumerTest.java index 67bc7c7dc427c..906a4e36e439f 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/AsyncStreamConsumerTest.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/AsyncStreamConsumerTest.java @@ -118,7 +118,7 @@ void setup() { onClose, flushFunction, CATALOG, - new BufferManager(), + new BufferManager(outputRecordCollector), flushFailure, "default_ns"); @@ -204,7 +204,7 @@ void testBackPressure() throws Exception { (hasFailed, recordCounts) -> {}, flushFunction, CATALOG, - new BufferManager(1024 * 10), + new BufferManager(1024 * 10, outputRecordCollector), flushFailure, "default_ns"); when(flushFunction.getOptimalBatchSizeBytes()).thenReturn(0L); diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferDequeueTest.java b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferDequeueTest.java index eb565b90ec6d3..c7d877ad65bfd 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferDequeueTest.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferDequeueTest.java @@ -11,10 +11,12 @@ import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteRecordMessage; import io.airbyte.commons.json.Jsons; +import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteMessage.Type; import io.airbyte.protocol.models.v0.StreamDescriptor; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.function.Consumer; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -22,20 +24,20 @@ public class BufferDequeueTest { private static final int RECORD_SIZE_20_BYTES = 20; private static final String DEFAULT_NAMESPACE = "foo_namespace"; - public static final String RECORD_20_BYTES = "abc"; private static final String STREAM_NAME = "stream1"; private static final StreamDescriptor STREAM_DESC = new StreamDescriptor().withName(STREAM_NAME); private static final PartialAirbyteMessage RECORD_MSG_20_BYTES = new PartialAirbyteMessage() .withType(Type.RECORD) .withRecord(new PartialAirbyteRecordMessage() .withStream(STREAM_NAME)); + private final Consumer outputRecordCollector = c -> {}; @Nested class Take { @Test void testTakeShouldBestEffortRead() { - final BufferManager bufferManager = new BufferManager(); + final BufferManager bufferManager = new BufferManager(outputRecordCollector); final BufferEnqueue enqueue = bufferManager.getBufferEnqueue(); final BufferDequeue dequeue = bufferManager.getBufferDequeue(); @@ -57,7 +59,7 @@ void testTakeShouldBestEffortRead() { @Test void testTakeShouldReturnAllIfPossible() { - final BufferManager bufferManager = new BufferManager(); + final BufferManager bufferManager = new BufferManager(outputRecordCollector); final BufferEnqueue enqueue = bufferManager.getBufferEnqueue(); final BufferDequeue dequeue = bufferManager.getBufferDequeue(); @@ -74,7 +76,7 @@ void testTakeShouldReturnAllIfPossible() { @Test void testTakeFewerRecordsThanSizeLimitShouldNotError() { - final BufferManager bufferManager = new BufferManager(); + final BufferManager bufferManager = new BufferManager(outputRecordCollector); final BufferEnqueue enqueue = bufferManager.getBufferEnqueue(); final BufferDequeue dequeue = bufferManager.getBufferDequeue(); @@ -92,7 +94,7 @@ void testTakeFewerRecordsThanSizeLimitShouldNotError() { @Test void testMetadataOperationsCorrect() { - final BufferManager bufferManager = new BufferManager(); + final BufferManager bufferManager = new BufferManager(outputRecordCollector); final BufferEnqueue enqueue = bufferManager.getBufferEnqueue(); final BufferDequeue dequeue = bufferManager.getBufferDequeue(); @@ -120,7 +122,7 @@ void testMetadataOperationsCorrect() { @Test void testMetadataOperationsError() { - final BufferManager bufferManager = new BufferManager(); + final BufferManager bufferManager = new BufferManager(outputRecordCollector); final BufferDequeue dequeue = bufferManager.getBufferDequeue(); final var ghostStream = new StreamDescriptor().withName("ghost stream"); @@ -136,7 +138,7 @@ void testMetadataOperationsError() { @Test void cleansUpMemoryForEmptyQueues() throws Exception { - final var bufferManager = new BufferManager(); + final var bufferManager = new BufferManager(outputRecordCollector); final var enqueue = bufferManager.getBufferEnqueue(); final var dequeue = bufferManager.getBufferDequeue(); final var memoryManager = bufferManager.getMemoryManager(); diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManagerTest.java b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManagerTest.java index fa1c60b1f0e2d..a30a3a6f91ee9 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManagerTest.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManagerTest.java @@ -8,19 +8,17 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.cdk.integrations.destination_async.GlobalMemoryManager; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteStateMessage; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteStreamState; import io.airbyte.protocol.models.Jsons; +import io.airbyte.protocol.models.v0.*; import io.airbyte.protocol.models.v0.AirbyteMessage.Type; import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType; -import io.airbyte.protocol.models.v0.AirbyteStateStats; -import io.airbyte.protocol.models.v0.StreamDescriptor; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -45,58 +43,105 @@ class GlobalAsyncStateManagerTest { private static final PartialAirbyteMessage GLOBAL_STATE_MESSAGE1 = new PartialAirbyteMessage() .withType(Type.STATE) .withState(new PartialAirbyteStateMessage() - .withType(AirbyteStateType.GLOBAL)); + .withType(AirbyteStateType.GLOBAL)) + .withSerialized(serializedState(STREAM1_DESC, AirbyteStateType.GLOBAL, Jsons.jsonNode(ImmutableMap.of("cursor", 1)))); private static final PartialAirbyteMessage GLOBAL_STATE_MESSAGE2 = new PartialAirbyteMessage() .withType(Type.STATE) .withState(new PartialAirbyteStateMessage() - .withType(AirbyteStateType.GLOBAL)); + .withType(AirbyteStateType.GLOBAL)) + .withSerialized(serializedState(STREAM2_DESC, AirbyteStateType.GLOBAL, Jsons.jsonNode(ImmutableMap.of("cursor", 2)))); + private static final PartialAirbyteMessage GLOBAL_STATE_MESSAGE3 = new PartialAirbyteMessage() .withType(Type.STATE) .withState(new PartialAirbyteStateMessage() - .withType(AirbyteStateType.GLOBAL)); + .withType(AirbyteStateType.GLOBAL)) + .withSerialized(serializedState(STREAM3_DESC, AirbyteStateType.GLOBAL, Jsons.jsonNode(ImmutableMap.of("cursor", 2)))); private static final PartialAirbyteMessage STREAM1_STATE_MESSAGE1 = new PartialAirbyteMessage() .withType(Type.STATE) .withState(new PartialAirbyteStateMessage() .withType(AirbyteStateType.STREAM) - .withStream(new PartialAirbyteStreamState().withStreamDescriptor(STREAM1_DESC))); + .withStream(new PartialAirbyteStreamState().withStreamDescriptor(STREAM1_DESC))) + .withSerialized(serializedState(STREAM1_DESC, AirbyteStateType.STREAM, Jsons.jsonNode(ImmutableMap.of("cursor", 1)))); private static final PartialAirbyteMessage STREAM1_STATE_MESSAGE2 = new PartialAirbyteMessage() .withType(Type.STATE) .withState(new PartialAirbyteStateMessage() .withType(AirbyteStateType.STREAM) - .withStream(new PartialAirbyteStreamState().withStreamDescriptor(STREAM1_DESC))); + .withStream(new PartialAirbyteStreamState().withStreamDescriptor(STREAM1_DESC))) + .withSerialized(serializedState(STREAM1_DESC, AirbyteStateType.STREAM, Jsons.jsonNode(ImmutableMap.of("cursor", 2)))); private static final PartialAirbyteMessage STREAM1_STATE_MESSAGE3 = new PartialAirbyteMessage() .withType(Type.STATE) .withState(new PartialAirbyteStateMessage() .withType(AirbyteStateType.STREAM) - .withStream(new PartialAirbyteStreamState().withStreamDescriptor(STREAM1_DESC))); + .withStream(new PartialAirbyteStreamState().withStreamDescriptor(STREAM1_DESC))) + .withSerialized(serializedState(STREAM1_DESC, AirbyteStateType.STREAM, Jsons.jsonNode(ImmutableMap.of("cursor", 3)))); private static final PartialAirbyteMessage STREAM2_STATE_MESSAGE = new PartialAirbyteMessage() .withType(Type.STATE) .withState(new PartialAirbyteStateMessage() .withType(AirbyteStateType.STREAM) - .withStream(new PartialAirbyteStreamState().withStreamDescriptor(STREAM2_DESC))); + .withStream(new PartialAirbyteStreamState().withStreamDescriptor(STREAM2_DESC))) + .withSerialized(serializedState(STREAM2_DESC, AirbyteStateType.STREAM, Jsons.jsonNode(ImmutableMap.of("cursor", 4)))); + + public static String serializedState(final StreamDescriptor streamDescriptor, final AirbyteStateType type, final JsonNode state) { + switch (type) { + case GLOBAL -> { + return Jsons.serialize(new AirbyteMessage().withType(Type.STATE).withState( + new AirbyteStateMessage() + .withType(AirbyteStateType.GLOBAL) + .withGlobal(new AirbyteGlobalState() + .withSharedState(state) + .withStreamStates(Collections.singletonList(new AirbyteStreamState() + .withStreamState(Jsons.emptyObject()) + .withStreamDescriptor(streamDescriptor)))))); + + } + case STREAM -> { + return Jsons.serialize(new AirbyteMessage().withType(Type.STATE).withState( + new AirbyteStateMessage() + .withType(AirbyteStateType.STREAM) + .withStream(new AirbyteStreamState() + .withStreamState(state) + .withStreamDescriptor(streamDescriptor)))); + } + default -> throw new RuntimeException("LEGACY STATE NOT SUPPORTED"); + } + } @Test void testBasic() { - final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); + final List emittedStatesFromDestination = new ArrayList<>(); + final GlobalAsyncStateManager stateManager = + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES), emittedStatesFromDestination::add); final var firstStateId = stateManager.getStateIdAndIncrementCounter(STREAM1_DESC); final var secondStateId = stateManager.getStateIdAndIncrementCounter(STREAM1_DESC); assertEquals(firstStateId, secondStateId); stateManager.decrement(firstStateId, 2); + stateManager.flushStates(); // because no state message has been tracked, there is nothing to flush yet. - final Map stateWithStats = - stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); + final Map stateWithStats = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); assertEquals(0, stateWithStats.size()); stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); - final Map stateWithStats2 = - stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(STREAM1_STATE_MESSAGE1), stateWithStats2.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(2.0)), stateWithStats2.values().stream().toList()); + stateManager.flushStates(); + + final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(2.0); + final Map stateWithStats2 = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(STREAM1_STATE_MESSAGE1.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats2.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats2.values().stream().toList()); + } + + public AirbyteMessage attachDestinationStateStats(final AirbyteMessage stateMessage, final AirbyteStateStats airbyteStateStats) { + stateMessage.getState().withDestinationStats(airbyteStateStats); + return stateMessage; } @Nested @@ -104,21 +149,32 @@ class GlobalState { @Test void testEmptyQueuesGlobalState() { - final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); + final List emittedStatesFromDestination = new ArrayList<>(); + final GlobalAsyncStateManager stateManager = + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES), emittedStatesFromDestination::add); // GLOBAL stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); - final Map stateWithStats = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(GLOBAL_STATE_MESSAGE1), stateWithStats.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(0.0)), stateWithStats.values().stream().toList()); + stateManager.flushStates(); + final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(0.0); + final Map stateWithStats = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + // + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(GLOBAL_STATE_MESSAGE1.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats.values().stream().toList()); assertThrows(IllegalArgumentException.class, () -> stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE)); } @Test void testConversion() { - final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); + final List emittedStatesFromDestination = new ArrayList<>(); + final GlobalAsyncStateManager stateManager = + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES), emittedStatesFromDestination::add); final var preConvertId0 = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); final var preConvertId1 = simulateIncomingRecords(STREAM2_DESC, 10, stateManager); @@ -129,68 +185,114 @@ void testConversion() { // Since this is actually a global state, we can only flush after all streams are done. stateManager.decrement(preConvertId0, 10); - assertEquals(List.of(), stateManager.flushStates()); + stateManager.flushStates(); + assertEquals(0, emittedStatesFromDestination.size()); stateManager.decrement(preConvertId1, 10); - assertEquals(List.of(), stateManager.flushStates()); + stateManager.flushStates(); + assertEquals(0, emittedStatesFromDestination.size()); stateManager.decrement(preConvertId2, 10); - final Map stateWithStats = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(GLOBAL_STATE_MESSAGE1), stateWithStats.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(30.0)), stateWithStats.values().stream().toList()); + stateManager.flushStates(); + final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(30.0); + final Map stateWithStats = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(GLOBAL_STATE_MESSAGE1.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats.values().stream().toList()); } @Test void testCorrectFlushingOneStream() { - final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); + final List emittedStatesFromDestination = new ArrayList<>(); + final GlobalAsyncStateManager stateManager = + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES), emittedStatesFromDestination::add); final var preConvertId0 = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(preConvertId0, 10); - final Map stateWithStats = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(GLOBAL_STATE_MESSAGE1), stateWithStats.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(10.0)), stateWithStats.values().stream().toList()); + stateManager.flushStates(); + final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(10.0); + final Map stateWithStats = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(GLOBAL_STATE_MESSAGE1.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats.values().stream().toList()); + + emittedStatesFromDestination.clear(); final var afterConvertId1 = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); stateManager.trackState(GLOBAL_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(afterConvertId1, 10); - final Map stateWithStats2 = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(GLOBAL_STATE_MESSAGE2), stateWithStats2.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(10.0)), stateWithStats2.values().stream().toList()); + stateManager.flushStates(); + final Map stateWithStats2 = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(GLOBAL_STATE_MESSAGE2.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats2.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats2.values().stream().toList()); } @Test void testZeroRecordFlushing() { - final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); + final List emittedStatesFromDestination = new ArrayList<>(); + final GlobalAsyncStateManager stateManager = + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES), emittedStatesFromDestination::add); final var preConvertId0 = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(preConvertId0, 10); - final Map stateWithStats = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(GLOBAL_STATE_MESSAGE1), stateWithStats.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(10.0)), stateWithStats.values().stream().toList()); + stateManager.flushStates(); + final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(10.0); + final Map stateWithStats = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(GLOBAL_STATE_MESSAGE1.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats.values().stream().toList()); + emittedStatesFromDestination.clear(); stateManager.trackState(GLOBAL_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE); - final Map stateWithStats2 = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(GLOBAL_STATE_MESSAGE2), stateWithStats2.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(0.0)), stateWithStats2.values().stream().toList()); + stateManager.flushStates(); + final AirbyteStateStats expectedDestinationStats2 = new AirbyteStateStats().withRecordCount(0.0); + final Map stateWithStats2 = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(GLOBAL_STATE_MESSAGE2.getSerialized(), AirbyteMessage.class), expectedDestinationStats2)), + stateWithStats2.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats2), stateWithStats2.values().stream().toList()); + emittedStatesFromDestination.clear(); final var afterConvertId2 = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); stateManager.trackState(GLOBAL_STATE_MESSAGE3, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(afterConvertId2, 10); - final Map stateWithStats3 = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(GLOBAL_STATE_MESSAGE3), stateWithStats3.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(10.0)), stateWithStats3.values().stream().toList()); + stateManager.flushStates(); + final Map stateWithStats3 = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(GLOBAL_STATE_MESSAGE3.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats3.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats3.values().stream().toList()); } @Test void testCorrectFlushingManyStreams() { - final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); + final List emittedStatesFromDestination = new ArrayList<>(); + final GlobalAsyncStateManager stateManager = + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES), emittedStatesFromDestination::add); final var preConvertId0 = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); final var preConvertId1 = simulateIncomingRecords(STREAM2_DESC, 10, stateManager); @@ -198,20 +300,32 @@ void testCorrectFlushingManyStreams() { stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(preConvertId0, 10); stateManager.decrement(preConvertId1, 10); - final Map stateWithStats = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(GLOBAL_STATE_MESSAGE1), stateWithStats.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(20.0)), stateWithStats.values().stream().toList()); + stateManager.flushStates(); + final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(20.0); + final Map stateWithStats = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(GLOBAL_STATE_MESSAGE1.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats.values().stream().toList()); + emittedStatesFromDestination.clear(); final var afterConvertId0 = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); final var afterConvertId1 = simulateIncomingRecords(STREAM2_DESC, 10, stateManager); assertEquals(afterConvertId0, afterConvertId1); stateManager.trackState(GLOBAL_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(afterConvertId0, 20); - final Map stateWithStats2 = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(GLOBAL_STATE_MESSAGE2), stateWithStats2.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(20.0)), stateWithStats2.values().stream().toList()); + stateManager.flushStates(); + final Map stateWithStats2 = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(GLOBAL_STATE_MESSAGE2.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats2.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats2.values().stream().toList()); } } @@ -221,89 +335,148 @@ class PerStreamState { @Test void testEmptyQueues() { - final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); + final List emittedStatesFromDestination = new ArrayList<>(); + final GlobalAsyncStateManager stateManager = + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES), emittedStatesFromDestination::add); // GLOBAL stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); - final Map stateWithStats = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(STREAM1_STATE_MESSAGE1), stateWithStats.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(0.0)), stateWithStats.values().stream().toList()); + stateManager.flushStates(); + final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(0.0); + final Map stateWithStats = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(STREAM1_STATE_MESSAGE1.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats.values().stream().toList()); assertThrows(IllegalArgumentException.class, () -> stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE)); } @Test void testCorrectFlushingOneStream() { - final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); + final List emittedStatesFromDestination = new ArrayList<>(); + final GlobalAsyncStateManager stateManager = + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES), emittedStatesFromDestination::add); var stateId = simulateIncomingRecords(STREAM1_DESC, 3, stateManager); stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(stateId, 3); - final Map stateWithStats = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(STREAM1_STATE_MESSAGE1), stateWithStats.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(3.0)), stateWithStats.values().stream().toList()); + stateManager.flushStates(); + final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(3.0); + final Map stateWithStats = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(STREAM1_STATE_MESSAGE1.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats.values().stream().toList()); + + emittedStatesFromDestination.clear(); stateId = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); stateManager.trackState(STREAM1_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(stateId, 10); - final Map stateWithStats2 = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(STREAM1_STATE_MESSAGE2), stateWithStats2.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(10.0)), stateWithStats2.values().stream().toList()); + stateManager.flushStates(); + final AirbyteStateStats expectedDestinationStats2 = new AirbyteStateStats().withRecordCount(10.0); + final Map stateWithStats2 = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals(List.of( + attachDestinationStateStats(Jsons.deserialize(STREAM1_STATE_MESSAGE2.getSerialized(), AirbyteMessage.class), expectedDestinationStats2)), + stateWithStats2.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats2), stateWithStats2.values().stream().toList()); } @Test void testZeroRecordFlushing() { - final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); + final List emittedStatesFromDestination = new ArrayList<>(); + final GlobalAsyncStateManager stateManager = + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES), emittedStatesFromDestination::add); var stateId = simulateIncomingRecords(STREAM1_DESC, 3, stateManager); stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(stateId, 3); - final Map stateWithStats = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(STREAM1_STATE_MESSAGE1), stateWithStats.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(3.0)), stateWithStats.values().stream().toList()); + stateManager.flushStates(); + final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(3.0); + final Map stateWithStats = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(STREAM1_STATE_MESSAGE1.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats.values().stream().toList()); + emittedStatesFromDestination.clear(); stateManager.trackState(STREAM1_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE); - final Map stateWithStats2 = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(STREAM1_STATE_MESSAGE2), stateWithStats2.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(0.0)), stateWithStats2.values().stream().toList()); + stateManager.flushStates(); + final Map stateWithStats2 = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + final AirbyteStateStats expectedDestinationStats2 = new AirbyteStateStats().withRecordCount(0.0); + assertEquals(List.of( + attachDestinationStateStats(Jsons.deserialize(STREAM1_STATE_MESSAGE2.getSerialized(), AirbyteMessage.class), expectedDestinationStats2)), + stateWithStats2.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats2), stateWithStats2.values().stream().toList()); + emittedStatesFromDestination.clear(); stateId = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); stateManager.trackState(STREAM1_STATE_MESSAGE3, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(stateId, 10); - final Map stateWithStats3 = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(STREAM1_STATE_MESSAGE3), stateWithStats3.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(10.0)), stateWithStats3.values().stream().toList()); + stateManager.flushStates(); + final Map stateWithStats3 = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + final AirbyteStateStats expectedDestinationStats3 = new AirbyteStateStats().withRecordCount(10.0); + assertEquals(List.of( + attachDestinationStateStats(Jsons.deserialize(STREAM1_STATE_MESSAGE3.getSerialized(), AirbyteMessage.class), expectedDestinationStats3)), + stateWithStats3.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats3), stateWithStats3.values().stream().toList()); } @Test void testCorrectFlushingManyStream() { - final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); + final List emittedStatesFromDestination = new ArrayList<>(); + final GlobalAsyncStateManager stateManager = + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES), emittedStatesFromDestination::add); final var stream1StateId = simulateIncomingRecords(STREAM1_DESC, 3, stateManager); final var stream2StateId = simulateIncomingRecords(STREAM2_DESC, 7, stateManager); stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(stream1StateId, 3); - final Map stateWithStats = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(STREAM1_STATE_MESSAGE1), stateWithStats.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(3.0)), stateWithStats.values().stream().toList()); + stateManager.flushStates(); + final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(3.0); + final Map stateWithStats = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(STREAM1_STATE_MESSAGE1.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats.values().stream().toList()); + emittedStatesFromDestination.clear(); stateManager.decrement(stream2StateId, 4); - assertEquals(List.of(), stateManager.flushStates()); + stateManager.flushStates(); + assertEquals(List.of(), emittedStatesFromDestination); stateManager.trackState(STREAM2_STATE_MESSAGE, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(stream2StateId, 3); // only flush state if counter is 0. - final Map stateWithStats2 = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(STREAM2_STATE_MESSAGE), stateWithStats2.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(7.0)), stateWithStats2.values().stream().toList()); + stateManager.flushStates(); + final AirbyteStateStats expectedDestinationStats2 = new AirbyteStateStats().withRecordCount(7.0); + final Map stateWithStats2 = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(STREAM2_STATE_MESSAGE.getSerialized(), AirbyteMessage.class), expectedDestinationStats2)), + stateWithStats2.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats2), stateWithStats2.values().stream().toList()); } } @@ -318,21 +491,18 @@ private static long simulateIncomingRecords(final StreamDescriptor desc, final l @Test void flushingRecordsShouldNotReduceStatsCounterForGlobalState() { - final PartialAirbyteMessage globalState = new PartialAirbyteMessage() - .withState(new PartialAirbyteStateMessage().withType(AirbyteStateType.GLOBAL)) - .withSerialized(Jsons.serialize(ImmutableMap.of("cursor", "1"))) - .withType(Type.STATE); - final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); - + final List emittedStatesFromDestination = new ArrayList<>(); + final GlobalAsyncStateManager stateManager = + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES), emittedStatesFromDestination::add); final long stateId = simulateIncomingRecords(STREAM1_DESC, 6, stateManager); stateManager.decrement(stateId, 4); - stateManager.trackState(globalState, 1, STREAM1_DESC.getNamespace()); - final List stateBeforeAllRecordsAreFlushed = stateManager.flushStates(); - assertEquals(0, stateBeforeAllRecordsAreFlushed.size()); + stateManager.trackState(GLOBAL_STATE_MESSAGE1, 1, STREAM1_DESC.getNamespace()); + stateManager.flushStates(); + assertEquals(0, emittedStatesFromDestination.size()); stateManager.decrement(stateId, 2); - List stateAfterAllRecordsAreFlushed = stateManager.flushStates(); - assertEquals(1, stateAfterAllRecordsAreFlushed.size()); - assertEquals(6.0, stateAfterAllRecordsAreFlushed.get(0).stats().getRecordCount()); + stateManager.flushStates(); + assertEquals(1, emittedStatesFromDestination.size()); + assertEquals(6.0, emittedStatesFromDestination.getFirst().getState().getDestinationStats().getRecordCount()); } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java index d0d488c712840..268ef5618bc36 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java @@ -76,7 +76,7 @@ public static SerializedAirbyteMessageConsumer createAsync(final Consumer Date: Wed, 14 Feb 2024 01:48:52 +0530 Subject: [PATCH 2/7] temp commit to test bigquery+snowflake --- .../connectors/destination-bigquery/build.gradle | 2 +- .../connectors/destination-snowflake/build.gradle | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery/build.gradle b/airbyte-integrations/connectors/destination-bigquery/build.gradle index a751adb46023d..740b38b66a0eb 100644 --- a/airbyte-integrations/connectors/destination-bigquery/build.gradle +++ b/airbyte-integrations/connectors/destination-bigquery/build.gradle @@ -11,7 +11,7 @@ airbyteJavaConnector { 's3-destinations', 'typing-deduping', ] - useLocalCdk = false + useLocalCdk = true } java { diff --git a/airbyte-integrations/connectors/destination-snowflake/build.gradle b/airbyte-integrations/connectors/destination-snowflake/build.gradle index 44c45c891d739..e9b3482c365e5 100644 --- a/airbyte-integrations/connectors/destination-snowflake/build.gradle +++ b/airbyte-integrations/connectors/destination-snowflake/build.gradle @@ -5,7 +5,7 @@ plugins { airbyteJavaConnector { cdkVersionRequired = '0.20.2' features = ['db-destinations', 's3-destinations', 'typing-deduping'] - useLocalCdk = false + useLocalCdk = true } java { From ad55a52d84db4c642897c9cb327e9b3bbf150263 Mon Sep 17 00:00:00 2001 From: subodhchaturvedi Date: Wed, 14 Feb 2024 02:02:21 +0530 Subject: [PATCH 3/7] fix compilation change for bigquery --- .../destination/bigquery/BigQueryRecordStandardConsumer.java | 2 +- .../destination/bigquery/BigQueryStagingConsumerFactory.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordStandardConsumer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordStandardConsumer.java index c0cd460cfdfa0..32d9e83ce411c 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordStandardConsumer.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordStandardConsumer.java @@ -35,7 +35,7 @@ public BigQueryRecordStandardConsumer(Consumer outputRecordColle onClose, new BigQueryAsyncStandardFlush(bigQuery, uploaderMap), catalog, - new BufferManager((long) (Runtime.getRuntime().maxMemory() * 0.5)), + new BufferManager((long) (Runtime.getRuntime().maxMemory() * 0.5), outputRecordCollector), defaultNamespace, Executors.newFixedThreadPool(2)); } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java index a929bfbf095f3..cdff88bebded2 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java @@ -70,7 +70,7 @@ public SerializedAirbyteMessageConsumer createAsync( }, flusher, catalog, - new BufferManager(getBigQueryBufferMemoryLimit()), + new BufferManager(getBigQueryBufferMemoryLimit(), outputRecordCollector), defaultNamespace); } From 3ecb04c900d35f21feecf51a35a8feb15a06bff1 Mon Sep 17 00:00:00 2001 From: subodhchaturvedi Date: Thu, 15 Feb 2024 18:37:33 +0530 Subject: [PATCH 4/7] address review comments --- .../destination_async/FlushWorkers.java | 4 +- .../buffers/BufferManager.java | 11 ++- .../buffers/MemoryAwareMessageBatch.java | 13 ++-- .../state/GlobalAsyncStateManager.java | 13 ++-- .../AsyncStreamConsumerTest.java | 4 +- .../buffers/BufferDequeueTest.java | 15 ++-- .../state/GlobalAsyncStateManagerTest.java | 70 +++++++++---------- .../jdbc/JdbcBufferedConsumerFactory.java | 2 +- .../staging/StagingConsumerFactory.java | 2 +- .../BigQueryRecordStandardConsumer.java | 2 +- .../BigQueryStagingConsumerFactory.java | 2 +- 11 files changed, 63 insertions(+), 75 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/FlushWorkers.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/FlushWorkers.java index a4db507bb8c67..32b01c5702917 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/FlushWorkers.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/FlushWorkers.java @@ -167,7 +167,7 @@ private void flush(final StreamDescriptor desc, final UUID flushWorkerId) { AirbyteFileUtils.byteCountToDisplaySize(batch.getSizeInBytes())); flusher.flush(desc, batch.getData().stream().map(MessageWithMeta::message)); - batch.flushStates(stateIdToCount); + batch.flushStates(stateIdToCount, outputRecordCollector); } log.info("Flush Worker ({}) -- Worker finished flushing. Current queue size: {}", @@ -217,7 +217,7 @@ public void close() throws Exception { log.info("Closing flush workers -- all buffers flushed"); // before shutting down the supervisor, flush all state. - stateManager.flushStates(); + stateManager.flushStates(outputRecordCollector); supervisorThread.shutdown(); while (!supervisorThread.awaitTermination(5L, TimeUnit.MINUTES)) { log.info("Waiting for flush worker supervisor to shut down"); diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferManager.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferManager.java index 5834277839ffa..1d824a2b14c04 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferManager.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferManager.java @@ -9,7 +9,6 @@ import io.airbyte.cdk.integrations.destination_async.FlushWorkers; import io.airbyte.cdk.integrations.destination_async.GlobalMemoryManager; import io.airbyte.cdk.integrations.destination_async.state.GlobalAsyncStateManager; -import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.StreamDescriptor; import java.util.ArrayList; import java.util.concurrent.ConcurrentHashMap; @@ -17,7 +16,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; @@ -40,8 +38,8 @@ public class BufferManager { public static final double MEMORY_LIMIT_RATIO = 0.7; - public BufferManager(final Consumer outputRecordCollector) { - this((long) (Runtime.getRuntime().maxMemory() * MEMORY_LIMIT_RATIO), outputRecordCollector); + public BufferManager() { + this((long) (Runtime.getRuntime().maxMemory() * MEMORY_LIMIT_RATIO)); } /** @@ -49,13 +47,12 @@ public BufferManager(final Consumer outputRecordCollector) { * GlobalMemoryManager will apply back pressure once this quota is filled. "Memory" can be * released back once flushing finishes. This number should be large enough we don't block * reading unnecessarily, but small enough we apply back pressure before OOMing. - * @param outputRecordCollector */ - public BufferManager(final long memoryLimit, final Consumer outputRecordCollector) { + public BufferManager(final long memoryLimit) { maxMemory = memoryLimit; LOGGER.info("Max 'memory' available for buffer allocation {}", FileUtils.byteCountToDisplaySize(maxMemory)); memoryManager = new GlobalMemoryManager(maxMemory); - this.stateManager = new GlobalAsyncStateManager(memoryManager, outputRecordCollector); + this.stateManager = new GlobalAsyncStateManager(memoryManager); buffers = new ConcurrentHashMap<>(); bufferEnqueue = new BufferEnqueue(memoryManager, buffers, stateManager); bufferDequeue = new BufferDequeue(memoryManager, buffers, stateManager); diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/MemoryAwareMessageBatch.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/MemoryAwareMessageBatch.java index 7710e3fc21620..213f30e7768e7 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/MemoryAwareMessageBatch.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/MemoryAwareMessageBatch.java @@ -7,8 +7,10 @@ import io.airbyte.cdk.integrations.destination_async.GlobalMemoryManager; import io.airbyte.cdk.integrations.destination_async.buffers.StreamAwareQueue.MessageWithMeta; import io.airbyte.cdk.integrations.destination_async.state.GlobalAsyncStateManager; +import io.airbyte.protocol.models.v0.AirbyteMessage; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,16 +58,13 @@ public void close() throws Exception { } /** - * For the batch, marks all the states that have now been flushed. Also returns states that can be - * flushed. This method is descriptrive, it assumes that whatever consumes the state messages emits - * them, internally it purges the states it returns. message that it can. + * For the batch, marks all the states that have now been flushed. Also writes the states that can + * be flushed back to platform via stateManager. *

- * - * @return list of states that can be flushed */ - public void flushStates(final Map stateIdToCount) { + public void flushStates(final Map stateIdToCount, final Consumer outputRecordCollector) { stateIdToCount.forEach(stateManager::decrement); - stateManager.flushStates(); + stateManager.flushStates(outputRecordCollector); } } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManager.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManager.java index f89fc4eb23441..845dfdd629ea0 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManager.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManager.java @@ -96,13 +96,11 @@ public class GlobalAsyncStateManager { private long retroactiveGlobalStateId = 0; // All access to this field MUST be guarded by a synchronized(lock) block private long arrivalNumber = 0; - private final Consumer outputRecordCollector; - private static final Object LOCK = new Object(); + private final Object LOCK = new Object(); - public GlobalAsyncStateManager(final GlobalMemoryManager memoryManager, final Consumer outputRecordCollector) { + public GlobalAsyncStateManager(final GlobalMemoryManager memoryManager) { this.memoryManager = memoryManager; - this.outputRecordCollector = outputRecordCollector; this.memoryAllocated = new AtomicLong(memoryManager.requestMemory()); this.memoryUsed = new AtomicLong(); } @@ -156,15 +154,12 @@ public void decrement(final long stateId, final long count) { } /** - * Returns state messages with no more inflight records i.e. counter = 0 across all streams. + * Flushes state messages with no more inflight records i.e. counter = 0 across all streams. * Intended to be called by {@link io.airbyte.cdk.integrations.destination_async.FlushWorkers} after * a worker has finished flushing its record batch. *

- * The return list of states should be emitted back to the platform. - * - * @return list of state messages with no more inflight records. */ - public void flushStates() { + public void flushStates(final Consumer outputRecordCollector) { Long bytesFlushed = 0L; synchronized (LOCK) { for (final Map.Entry> entry : descToStateIdQ.entrySet()) { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/AsyncStreamConsumerTest.java b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/AsyncStreamConsumerTest.java index 906a4e36e439f..67bc7c7dc427c 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/AsyncStreamConsumerTest.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/AsyncStreamConsumerTest.java @@ -118,7 +118,7 @@ void setup() { onClose, flushFunction, CATALOG, - new BufferManager(outputRecordCollector), + new BufferManager(), flushFailure, "default_ns"); @@ -204,7 +204,7 @@ void testBackPressure() throws Exception { (hasFailed, recordCounts) -> {}, flushFunction, CATALOG, - new BufferManager(1024 * 10, outputRecordCollector), + new BufferManager(1024 * 10), flushFailure, "default_ns"); when(flushFunction.getOptimalBatchSizeBytes()).thenReturn(0L); diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferDequeueTest.java b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferDequeueTest.java index c7d877ad65bfd..669579c7af968 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferDequeueTest.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferDequeueTest.java @@ -11,12 +11,10 @@ import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteRecordMessage; import io.airbyte.commons.json.Jsons; -import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteMessage.Type; import io.airbyte.protocol.models.v0.StreamDescriptor; import java.time.Instant; import java.time.temporal.ChronoUnit; -import java.util.function.Consumer; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -30,14 +28,13 @@ public class BufferDequeueTest { .withType(Type.RECORD) .withRecord(new PartialAirbyteRecordMessage() .withStream(STREAM_NAME)); - private final Consumer outputRecordCollector = c -> {}; @Nested class Take { @Test void testTakeShouldBestEffortRead() { - final BufferManager bufferManager = new BufferManager(outputRecordCollector); + final BufferManager bufferManager = new BufferManager(); final BufferEnqueue enqueue = bufferManager.getBufferEnqueue(); final BufferDequeue dequeue = bufferManager.getBufferDequeue(); @@ -59,7 +56,7 @@ void testTakeShouldBestEffortRead() { @Test void testTakeShouldReturnAllIfPossible() { - final BufferManager bufferManager = new BufferManager(outputRecordCollector); + final BufferManager bufferManager = new BufferManager(); final BufferEnqueue enqueue = bufferManager.getBufferEnqueue(); final BufferDequeue dequeue = bufferManager.getBufferDequeue(); @@ -76,7 +73,7 @@ void testTakeShouldReturnAllIfPossible() { @Test void testTakeFewerRecordsThanSizeLimitShouldNotError() { - final BufferManager bufferManager = new BufferManager(outputRecordCollector); + final BufferManager bufferManager = new BufferManager(); final BufferEnqueue enqueue = bufferManager.getBufferEnqueue(); final BufferDequeue dequeue = bufferManager.getBufferDequeue(); @@ -94,7 +91,7 @@ void testTakeFewerRecordsThanSizeLimitShouldNotError() { @Test void testMetadataOperationsCorrect() { - final BufferManager bufferManager = new BufferManager(outputRecordCollector); + final BufferManager bufferManager = new BufferManager(); final BufferEnqueue enqueue = bufferManager.getBufferEnqueue(); final BufferDequeue dequeue = bufferManager.getBufferDequeue(); @@ -122,7 +119,7 @@ void testMetadataOperationsCorrect() { @Test void testMetadataOperationsError() { - final BufferManager bufferManager = new BufferManager(outputRecordCollector); + final BufferManager bufferManager = new BufferManager(); final BufferDequeue dequeue = bufferManager.getBufferDequeue(); final var ghostStream = new StreamDescriptor().withName("ghost stream"); @@ -138,7 +135,7 @@ void testMetadataOperationsError() { @Test void cleansUpMemoryForEmptyQueues() throws Exception { - final var bufferManager = new BufferManager(outputRecordCollector); + final var bufferManager = new BufferManager(); final var enqueue = bufferManager.getBufferEnqueue(); final var dequeue = bufferManager.getBufferDequeue(); final var memoryManager = bufferManager.getMemoryManager(); diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManagerTest.java b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManagerTest.java index a30a3a6f91ee9..b77c4419cd1cb 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManagerTest.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManagerTest.java @@ -111,14 +111,14 @@ public static String serializedState(final StreamDescriptor streamDescriptor, fi void testBasic() { final List emittedStatesFromDestination = new ArrayList<>(); final GlobalAsyncStateManager stateManager = - new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES), emittedStatesFromDestination::add); + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); final var firstStateId = stateManager.getStateIdAndIncrementCounter(STREAM1_DESC); final var secondStateId = stateManager.getStateIdAndIncrementCounter(STREAM1_DESC); assertEquals(firstStateId, secondStateId); stateManager.decrement(firstStateId, 2); - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); // because no state message has been tracked, there is nothing to flush yet. final Map stateWithStats = emittedStatesFromDestination.stream() @@ -126,7 +126,7 @@ void testBasic() { assertEquals(0, stateWithStats.size()); stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(2.0); final Map stateWithStats2 = @@ -151,11 +151,11 @@ class GlobalState { void testEmptyQueuesGlobalState() { final List emittedStatesFromDestination = new ArrayList<>(); final GlobalAsyncStateManager stateManager = - new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES), emittedStatesFromDestination::add); + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); // GLOBAL stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(0.0); final Map stateWithStats = emittedStatesFromDestination.stream() @@ -174,7 +174,7 @@ void testEmptyQueuesGlobalState() { void testConversion() { final List emittedStatesFromDestination = new ArrayList<>(); final GlobalAsyncStateManager stateManager = - new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES), emittedStatesFromDestination::add); + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); final var preConvertId0 = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); final var preConvertId1 = simulateIncomingRecords(STREAM2_DESC, 10, stateManager); @@ -185,13 +185,13 @@ void testConversion() { // Since this is actually a global state, we can only flush after all streams are done. stateManager.decrement(preConvertId0, 10); - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); assertEquals(0, emittedStatesFromDestination.size()); stateManager.decrement(preConvertId1, 10); - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); assertEquals(0, emittedStatesFromDestination.size()); stateManager.decrement(preConvertId2, 10); - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(30.0); final Map stateWithStats = emittedStatesFromDestination.stream() @@ -208,12 +208,12 @@ void testConversion() { void testCorrectFlushingOneStream() { final List emittedStatesFromDestination = new ArrayList<>(); final GlobalAsyncStateManager stateManager = - new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES), emittedStatesFromDestination::add); + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); final var preConvertId0 = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(preConvertId0, 10); - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(10.0); final Map stateWithStats = emittedStatesFromDestination.stream() @@ -229,7 +229,7 @@ void testCorrectFlushingOneStream() { final var afterConvertId1 = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); stateManager.trackState(GLOBAL_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(afterConvertId1, 10); - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); final Map stateWithStats2 = emittedStatesFromDestination.stream() .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); @@ -244,12 +244,12 @@ void testCorrectFlushingOneStream() { void testZeroRecordFlushing() { final List emittedStatesFromDestination = new ArrayList<>(); final GlobalAsyncStateManager stateManager = - new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES), emittedStatesFromDestination::add); + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); final var preConvertId0 = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(preConvertId0, 10); - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(10.0); final Map stateWithStats = emittedStatesFromDestination.stream() @@ -262,7 +262,7 @@ void testZeroRecordFlushing() { emittedStatesFromDestination.clear(); stateManager.trackState(GLOBAL_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE); - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); final AirbyteStateStats expectedDestinationStats2 = new AirbyteStateStats().withRecordCount(0.0); final Map stateWithStats2 = emittedStatesFromDestination.stream() @@ -277,7 +277,7 @@ void testZeroRecordFlushing() { final var afterConvertId2 = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); stateManager.trackState(GLOBAL_STATE_MESSAGE3, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(afterConvertId2, 10); - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); final Map stateWithStats3 = emittedStatesFromDestination.stream() .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); @@ -292,7 +292,7 @@ void testZeroRecordFlushing() { void testCorrectFlushingManyStreams() { final List emittedStatesFromDestination = new ArrayList<>(); final GlobalAsyncStateManager stateManager = - new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES), emittedStatesFromDestination::add); + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); final var preConvertId0 = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); final var preConvertId1 = simulateIncomingRecords(STREAM2_DESC, 10, stateManager); @@ -300,7 +300,7 @@ void testCorrectFlushingManyStreams() { stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(preConvertId0, 10); stateManager.decrement(preConvertId1, 10); - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(20.0); final Map stateWithStats = emittedStatesFromDestination.stream() @@ -317,7 +317,7 @@ void testCorrectFlushingManyStreams() { assertEquals(afterConvertId0, afterConvertId1); stateManager.trackState(GLOBAL_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(afterConvertId0, 20); - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); final Map stateWithStats2 = emittedStatesFromDestination.stream() .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); @@ -337,11 +337,11 @@ class PerStreamState { void testEmptyQueues() { final List emittedStatesFromDestination = new ArrayList<>(); final GlobalAsyncStateManager stateManager = - new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES), emittedStatesFromDestination::add); + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); // GLOBAL stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(0.0); final Map stateWithStats = emittedStatesFromDestination.stream() @@ -359,12 +359,12 @@ void testEmptyQueues() { void testCorrectFlushingOneStream() { final List emittedStatesFromDestination = new ArrayList<>(); final GlobalAsyncStateManager stateManager = - new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES), emittedStatesFromDestination::add); + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); var stateId = simulateIncomingRecords(STREAM1_DESC, 3, stateManager); stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(stateId, 3); - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(3.0); final Map stateWithStats = emittedStatesFromDestination.stream() @@ -380,7 +380,7 @@ void testCorrectFlushingOneStream() { stateId = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); stateManager.trackState(STREAM1_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(stateId, 10); - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); final AirbyteStateStats expectedDestinationStats2 = new AirbyteStateStats().withRecordCount(10.0); final Map stateWithStats2 = emittedStatesFromDestination.stream() @@ -395,12 +395,12 @@ void testCorrectFlushingOneStream() { void testZeroRecordFlushing() { final List emittedStatesFromDestination = new ArrayList<>(); final GlobalAsyncStateManager stateManager = - new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES), emittedStatesFromDestination::add); + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); var stateId = simulateIncomingRecords(STREAM1_DESC, 3, stateManager); stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(stateId, 3); - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(3.0); final Map stateWithStats = emittedStatesFromDestination.stream() @@ -413,7 +413,7 @@ void testZeroRecordFlushing() { emittedStatesFromDestination.clear(); stateManager.trackState(STREAM1_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE); - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); final Map stateWithStats2 = emittedStatesFromDestination.stream() .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); @@ -427,7 +427,7 @@ void testZeroRecordFlushing() { stateId = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); stateManager.trackState(STREAM1_STATE_MESSAGE3, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(stateId, 10); - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); final Map stateWithStats3 = emittedStatesFromDestination.stream() .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); @@ -442,14 +442,14 @@ void testZeroRecordFlushing() { void testCorrectFlushingManyStream() { final List emittedStatesFromDestination = new ArrayList<>(); final GlobalAsyncStateManager stateManager = - new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES), emittedStatesFromDestination::add); + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); final var stream1StateId = simulateIncomingRecords(STREAM1_DESC, 3, stateManager); final var stream2StateId = simulateIncomingRecords(STREAM2_DESC, 7, stateManager); stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(stream1StateId, 3); - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(3.0); final Map stateWithStats = emittedStatesFromDestination.stream() @@ -462,12 +462,12 @@ void testCorrectFlushingManyStream() { emittedStatesFromDestination.clear(); stateManager.decrement(stream2StateId, 4); - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); assertEquals(List.of(), emittedStatesFromDestination); stateManager.trackState(STREAM2_STATE_MESSAGE, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(stream2StateId, 3); // only flush state if counter is 0. - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); final AirbyteStateStats expectedDestinationStats2 = new AirbyteStateStats().withRecordCount(7.0); final Map stateWithStats2 = emittedStatesFromDestination.stream() @@ -493,14 +493,14 @@ private static long simulateIncomingRecords(final StreamDescriptor desc, final l void flushingRecordsShouldNotReduceStatsCounterForGlobalState() { final List emittedStatesFromDestination = new ArrayList<>(); final GlobalAsyncStateManager stateManager = - new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES), emittedStatesFromDestination::add); + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); final long stateId = simulateIncomingRecords(STREAM1_DESC, 6, stateManager); stateManager.decrement(stateId, 4); stateManager.trackState(GLOBAL_STATE_MESSAGE1, 1, STREAM1_DESC.getNamespace()); - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); assertEquals(0, emittedStatesFromDestination.size()); stateManager.decrement(stateId, 2); - stateManager.flushStates(); + stateManager.flushStates(emittedStatesFromDestination::add); assertEquals(1, emittedStatesFromDestination.size()); assertEquals(6.0, emittedStatesFromDestination.getFirst().getState().getDestinationStats().getRecordCount()); } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java index 268ef5618bc36..d0d488c712840 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java @@ -76,7 +76,7 @@ public static SerializedAirbyteMessageConsumer createAsync(final Consumer outputRecordColle onClose, new BigQueryAsyncStandardFlush(bigQuery, uploaderMap), catalog, - new BufferManager((long) (Runtime.getRuntime().maxMemory() * 0.5), outputRecordCollector), + new BufferManager((long) (Runtime.getRuntime().maxMemory() * 0.5)), defaultNamespace, Executors.newFixedThreadPool(2)); } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java index cdff88bebded2..a929bfbf095f3 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java @@ -70,7 +70,7 @@ public SerializedAirbyteMessageConsumer createAsync( }, flusher, catalog, - new BufferManager(getBigQueryBufferMemoryLimit(), outputRecordCollector), + new BufferManager(getBigQueryBufferMemoryLimit()), defaultNamespace); } From 0f841cdc5def5931e412369d62895f706a57d646 Mon Sep 17 00:00:00 2001 From: subodhchaturvedi Date: Thu, 15 Feb 2024 23:19:19 +0530 Subject: [PATCH 5/7] changelog + bump version --- airbyte-cdk/java/airbyte-cdk/README.md | 1 + .../java/airbyte-cdk/core/src/main/resources/version.properties | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index f8d2a961fee45..d982ff531bd3b 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -166,6 +166,7 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.20.9 | 2024-02-15 | [\#35240](https://github.com/airbytehq/airbyte/pull/35240) | Make state emission to platform inside state manager itself. | | 0.20.8 | 2024-02-15 | [\#35285](https://github.com/airbytehq/airbyte/pull/35285) | Improve blobstore module structure. | | 0.20.7 | 2024-02-13 | [\#35236](https://github.com/airbytehq/airbyte/pull/35236) | output logs to files in addition to stdout when running tests | | 0.20.6 | 2024-02-12 | [\#35036](https://github.com/airbytehq/airbyte/pull/35036) | Add trace utility to emit analytics messages. | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 92e02bc716a56..55d88e2da2a25 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.20.8 \ No newline at end of file +version=0.20.9 \ No newline at end of file From 6be361689a2e0efe807d1f1264f4df78c40bf9ef Mon Sep 17 00:00:00 2001 From: subodhchaturvedi Date: Fri, 16 Feb 2024 00:01:10 +0530 Subject: [PATCH 6/7] bump version for bigquery --- .../connectors/destination-bigquery/build.gradle | 4 ++-- .../connectors/destination-bigquery/metadata.yaml | 2 +- docs/integrations/destinations/bigquery.md | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery/build.gradle b/airbyte-integrations/connectors/destination-bigquery/build.gradle index b91b960e6c50a..f5d1b05d4b547 100644 --- a/airbyte-integrations/connectors/destination-bigquery/build.gradle +++ b/airbyte-integrations/connectors/destination-bigquery/build.gradle @@ -3,14 +3,14 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.20.8' + cdkVersionRequired = '0.20.9' features = [ 'db-destinations', 'datastore-bigquery', 'typing-deduping', 'gcs-destinations', ] - useLocalCdk = true + useLocalCdk = false } java { diff --git a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml index e73aa2723d434..04c3fbdd22a2b 100644 --- a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml +++ b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 - dockerImageTag: 2.4.9 + dockerImageTag: 2.4.10 dockerRepository: airbyte/destination-bigquery documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery githubIssueLabel: destination-bigquery diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index e942362f8393f..1a6a55fa40e01 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -210,7 +210,8 @@ tutorials: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 2.4.9 | 2024-02-15 | [35285](https://github.com/airbytehq/airbyte/pull/35285) | Adopt CDK 0.20.8 | +| 2.4.10 | 2024-02-15 | [35240](https://github.com/airbytehq/airbyte/pull/35240) | Adopt CDK 0.20.9 | +| 2.4.9 | 2024-02-15 | [35285](https://github.com/airbytehq/airbyte/pull/35285) | Adopt CDK 0.20.8 | | 2.4.8 | 2024-02-12 | [35144](https://github.com/airbytehq/airbyte/pull/35144) | Adopt CDK 0.20.2 | | 2.4.7 | 2024-02-12 | [35111](https://github.com/airbytehq/airbyte/pull/35111) | Adopt CDK 0.20.1 | | 2.4.6 | 2024-02-09 | [34575](https://github.com/airbytehq/airbyte/pull/34575) | Adopt CDK 0.20.0 | From 9ecf43287e11a65e8ba167e9bae727f5689800f7 Mon Sep 17 00:00:00 2001 From: subodhchaturvedi Date: Fri, 16 Feb 2024 00:02:23 +0530 Subject: [PATCH 7/7] bump version for snowflake --- .../connectors/destination-snowflake/build.gradle | 4 ++-- .../connectors/destination-snowflake/metadata.yaml | 2 +- docs/integrations/destinations/snowflake.md | 1 + 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/build.gradle b/airbyte-integrations/connectors/destination-snowflake/build.gradle index e9b3482c365e5..3cc7265e2df9d 100644 --- a/airbyte-integrations/connectors/destination-snowflake/build.gradle +++ b/airbyte-integrations/connectors/destination-snowflake/build.gradle @@ -3,9 +3,9 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.20.2' + cdkVersionRequired = '0.20.9' features = ['db-destinations', 's3-destinations', 'typing-deduping'] - useLocalCdk = true + useLocalCdk = false } java { diff --git a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml index dd0c15064808d..2f49fddbe2942 100644 --- a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 424892c4-daac-4491-b35d-c6688ba547ba - dockerImageTag: 3.5.11 + dockerImageTag: 3.5.12 dockerRepository: airbyte/destination-snowflake documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake githubIssueLabel: destination-snowflake diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index d572da987adb0..3160d8c39dd92 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -246,6 +246,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n | Version | Date | Pull Request | Subject | |:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.5.12 | 2024-02-15 | [35240](https://github.com/airbytehq/airbyte/pull/35240) | Adopt CDK 0.20.9 | | 3.5.11 | 2024-02-12 | [35194](https://github.com/airbytehq/airbyte/pull/35194) | Reorder auth options | | 3.5.10 | 2024-02-12 | [35144](https://github.com/airbytehq/airbyte/pull/35144) | Adopt CDK 0.20.2 | | 3.5.9 | 2024-02-12 | [35111](https://github.com/airbytehq/airbyte/pull/35111) | Adopt CDK 0.20.1 |