From c0a46c1987735993901febe646b5b285334c2110 Mon Sep 17 00:00:00 2001 From: Serhii Chvaliuk Date: Tue, 4 Jan 2022 17:04:36 +0200 Subject: [PATCH] BufferedStreamConsumerTest: remove non-determinism in size of generated test records (#9274) * generate records fixed 40 bytes of size * fix buffer flush Signed-off-by: Sergey Chvalyuk --- .../buffered_stream_consumer/BufferedStreamConsumer.java | 2 +- .../buffered_stream_consumer/BufferedStreamConsumerTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java index c9aee79f28a5d..7d5f78e14bae0 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java @@ -147,7 +147,7 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception { // are serialized again when writing to // the destination long messageSizeInBytes = ByteUtils.getSizeInBytesForUTF8CharSet(Jsons.serialize(recordMessage.getData())); - if (bufferSizeInBytes + messageSizeInBytes >= maxQueueSizeInBytes) { + if (bufferSizeInBytes + messageSizeInBytes > maxQueueSizeInBytes) { LOGGER.info("Flushing buffer..."); flushQueueToDestination(); bufferSizeInBytes = 0; diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java index 0b1c34ae743b5..01549f4b61b5f 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java @@ -315,7 +315,7 @@ private static List generateRecords(final long targetSizeInBytes List output = Lists.newArrayList(); long bytesCounter = 0; for (int i = 0;; i++) { - JsonNode payload = Jsons.jsonNode(ImmutableMap.of("id", RandomStringUtils.randomAscii(7), "name", "human " + String.format("%5d", i))); + JsonNode payload = Jsons.jsonNode(ImmutableMap.of("id", RandomStringUtils.randomAlphabetic(7), "name", "human " + String.format("%8d", i))); long sizeInBytes = ByteUtils.getSizeInBytesForUTF8CharSet(Jsons.serialize(payload)); bytesCounter += sizeInBytes; AirbyteMessage airbyteMessage = new AirbyteMessage()