diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java index c9aee79f28a5d..7d5f78e14bae0 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java @@ -147,7 +147,7 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception { // are serialized again when writing to // the destination long messageSizeInBytes = ByteUtils.getSizeInBytesForUTF8CharSet(Jsons.serialize(recordMessage.getData())); - if (bufferSizeInBytes + messageSizeInBytes >= maxQueueSizeInBytes) { + if (bufferSizeInBytes + messageSizeInBytes > maxQueueSizeInBytes) { LOGGER.info("Flushing buffer..."); flushQueueToDestination(); bufferSizeInBytes = 0; diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java index 0b1c34ae743b5..01549f4b61b5f 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java @@ -315,7 +315,7 @@ private static List generateRecords(final long targetSizeInBytes List output = Lists.newArrayList(); long bytesCounter = 0; for (int i = 0;; i++) { - JsonNode payload = Jsons.jsonNode(ImmutableMap.of("id", RandomStringUtils.randomAscii(7), "name", "human " + String.format("%5d", i))); + JsonNode payload = Jsons.jsonNode(ImmutableMap.of("id", RandomStringUtils.randomAlphabetic(7), "name", "human " + String.format("%8d", i))); long sizeInBytes = ByteUtils.getSizeInBytesForUTF8CharSet(Jsons.serialize(payload)); bytesCounter += sizeInBytes; AirbyteMessage airbyteMessage = new AirbyteMessage()