Skip to content

Commit bef31d1

Browse files
Destination S3-V2: Bug Fix: File xfer uses part size for part, not file size
1 parent b309a7c commit bef31d1

File tree

4 files changed

+10
-5
lines changed

4 files changed

+10
-5
lines changed

airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import io.airbyte.cdk.load.message.DestinationFile
1616
import io.airbyte.cdk.load.message.MultiProducerChannel
1717
import io.airbyte.cdk.load.message.object_storage.LoadablePart
1818
import io.airbyte.cdk.load.write.FileBatchAccumulator
19+
import io.github.oshai.kotlinlogging.KotlinLogging
1920
import java.io.File
2021
import java.nio.file.Path
2122

@@ -28,6 +29,8 @@ class FilePartAccumulator(
2829
private val stream: DestinationStream,
2930
private val outputQueue: MultiProducerChannel<BatchEnvelope<*>>,
3031
) : FileBatchAccumulator {
32+
val log = KotlinLogging.logger {}
33+
3134
override suspend fun processFilePart(file: DestinationFile, index: Long) {
3235
val key =
3336
Path.of(pathFactory.getFinalDirectory(stream), "${file.fileMessage.fileRelativePath}")
@@ -44,8 +47,9 @@ class FilePartAccumulator(
4447

4548
while (true) {
4649
val bytePart =
47-
ByteArray(ObjectStorageUploadConfiguration.DEFAULT_FILE_SIZE_BYTES.toInt())
50+
ByteArray(ObjectStorageUploadConfiguration.DEFAULT_PART_SIZE_BYTES.toInt())
4851
val read = fileInputStream.read(bytePart)
52+
log.info { "Read $read bytes from file" }
4953

5054
if (read == -1) {
5155
val filePart: ByteArray? = null
@@ -62,6 +66,7 @@ class FilePartAccumulator(
6266
handleFilePart(batch, stream.descriptor, index)
6367
}
6468
}
69+
fileInputStream.close()
6570
localFile.delete()
6671
}
6772

airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/PartToObjectAccumulator.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class PartToObjectAccumulator<T : RemoteObject<*>>(
4848
val streamingUpload = upload.streamingUpload.await()
4949

5050
log.info {
51-
"Processing loadable part ${batch.part.partIndex} of ${batch.part.key} (empty=${batch.part.isEmpty}; final=${batch.part.isFinal})"
51+
"Processing loadable part ${batch.part.partIndex} of ${batch.part.key} (size=${batch.part.bytes?.size}; final=${batch.part.isFinal})"
5252
}
5353

5454
// Upload provided bytes and update indexes.

airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulatorTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class FilePartAccumulatorTest {
5050
fun testFilePartAccumulatorExactlyPartSize() = runTest {
5151
val finalDirectory = "finalDirectory"
5252
every { pathFactory.getFinalDirectory(stream) } returns finalDirectory
53-
val file = createFile(ObjectStorageUploadConfiguration.DEFAULT_FILE_SIZE_BYTES.toInt())
53+
val file = createFile(ObjectStorageUploadConfiguration.DEFAULT_PART_SIZE_BYTES.toInt())
5454
val index = 21L
5555
val fileMessage = createFileMessage(file)
5656

@@ -64,7 +64,7 @@ class FilePartAccumulatorTest {
6464
val finalDirectory = "finalDirectory"
6565
every { pathFactory.getFinalDirectory(stream) } returns finalDirectory
6666
val file =
67-
createFile(ObjectStorageUploadConfiguration.DEFAULT_FILE_SIZE_BYTES.toInt() + 1000)
67+
createFile(ObjectStorageUploadConfiguration.DEFAULT_PART_SIZE_BYTES.toInt() + 1000)
6868
val index = 21L
6969
val fileMessage = createFileMessage(file)
7070

airbyte-integrations/connectors/destination-s3/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66
airbyteBulkConnector {
77
core = 'load'
88
toolkits = ['load-s3', 'load-avro', 'load-aws']
9-
cdk = '0.296'
9+
cdk = 'local'
1010
}
1111

1212
application {

0 commit comments

Comments
 (0)