diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicPerformanceTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicPerformanceTest.kt index 7c0f18183247d..7a1bfeabe7890 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicPerformanceTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicPerformanceTest.kt @@ -27,6 +27,7 @@ import io.airbyte.cdk.load.test.util.destination_process.DestinationProcess import io.airbyte.cdk.load.test.util.destination_process.DestinationProcessFactory import io.airbyte.protocol.models.Jsons import io.github.oshai.kotlinlogging.KotlinLogging +import java.nio.file.Path import java.time.Instant import java.time.LocalDateTime import java.time.LocalTime @@ -40,6 +41,7 @@ import kotlinx.coroutines.runBlocking import org.apache.commons.lang3.RandomStringUtils import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.junit.jupiter.api.TestInfo import org.junit.jupiter.api.extension.ExtendWith @@ -110,6 +112,8 @@ abstract class BasicPerformanceTest( val dataValidator: DataValidator? = null, val micronautProperties: Map = emptyMap(), namespaceOverride: String? = null, + val numFilesForFileTransfer: Int = 5, + val fileSizeMbForFileTransfer: Int = 1024, ) { protected val destinationProcessFactory = DestinationProcessFactory.get(emptyList()) @@ -282,6 +286,25 @@ abstract class BasicPerformanceTest( testAppendRecordsWithDuplicates(null) } + @Test + @Disabled("Opt-in") + open fun testFileTransfer() { + val scenario = + SingleStreamFileTransfer( + randomizedNamespace = randomizedNamespace, + streamName = testInfo.testMethod.get().name, + numFiles = numFilesForFileTransfer, + fileSizeMb = fileSizeMbForFileTransfer, + stagingDirectory = Path.of("/tmp") + ) + scenario.setup() + runSync( + testScenario = scenario, + useFileTransfer = true, + validation = null, + ) + } + protected fun testAppendRecordsWithDuplicates(validation: ValidationFunction?) { runSync( testScenario = diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/PerformanceTestScenarios.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/PerformanceTestScenarios.kt index 8884215227fa9..ea56d71ad52d5 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/PerformanceTestScenarios.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/PerformanceTestScenarios.kt @@ -10,10 +10,15 @@ import io.airbyte.cdk.load.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.data.FieldType import io.airbyte.cdk.load.data.ObjectType +import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema +import io.airbyte.cdk.load.message.DestinationFile import io.airbyte.cdk.load.message.InputRecord import io.airbyte.cdk.load.test.util.destination_process.DestinationProcess import io.airbyte.protocol.models.Jsons +import io.github.oshai.kotlinlogging.KotlinLogging +import java.nio.file.Path import java.security.SecureRandom +import kotlin.random.Random /** * Single stream performance test. @@ -150,6 +155,76 @@ class SingleStreamInsert( ) } +class SingleStreamFileTransfer( + private val randomizedNamespace: String, + private val streamName: String, + private val numFiles: Int, + private val fileSizeMb: Int, + private val stagingDirectory: Path, + private val seed: Long = 8656931613L +) : PerformanceTestScenario { + private val log = KotlinLogging.logger {} + + private val descriptor = DestinationStream.Descriptor(randomizedNamespace, streamName) + + override val catalog: DestinationCatalog = + DestinationCatalog( + listOf( + DestinationStream( + descriptor = descriptor, + importType = Append, + schema = ObjectTypeWithoutSchema, + generationId = 1, + minimumGenerationId = 1, + syncId = 101 + ) + ) + ) + + private fun makeFileName(index: Long): String = + "test_file__${randomizedNamespace}__${streamName}__$index.txt" + + fun setup() { + // TODO: Maybe make these files different sizes? + val prng = Random(seed) + val randomMegabyte = ByteArray(1024 * 1024) { prng.nextInt().toByte() } + repeat(numFiles) { + val file = stagingDirectory.resolve(makeFileName(it.toLong())) + log.info { "Creating file $file with size ${fileSizeMb}mb" } + val outputStream = file.toFile().outputStream() + repeat(fileSizeMb) { outputStream.write(randomMegabyte) } + outputStream.close() + } + } + + override fun send(destination: DestinationProcess) { + repeat(numFiles) { + val fileName = makeFileName(it.toLong()) + val message = + DestinationFile( + descriptor, + System.currentTimeMillis(), + "", + DestinationFile.AirbyteRecordMessageFile( + fileUrl = stagingDirectory.resolve(fileName).toString(), + fileRelativePath = fileName, + bytes = fileSizeMb * 1024 * 1024L, + modified = System.currentTimeMillis(), + sourceFileUrl = fileName, + ) + ) + destination.sendMessage(message.asProtocolMessage()) + } + } + + override fun getSummary(): PerformanceTestScenario.Summary = + PerformanceTestScenario.Summary( + records = numFiles.toLong(), + size = numFiles * fileSizeMb * 1024 * 1024L, + expectedRecordsCount = numFiles.toLong() + ) +} + private fun Long.length(): Long = if (this <= 99999) { if (this <= 99) { diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2PerformanceTest.kt b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2PerformanceTest.kt index 371466601191a..6b02d9d8a32a8 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2PerformanceTest.kt +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2PerformanceTest.kt @@ -14,16 +14,13 @@ class S3V2JsonNoFrillsPerformanceTest : configContents = S3V2TestUtils.getConfig(S3V2TestUtils.JSON_UNCOMPRESSED_CONFIG_PATH), configSpecClass = S3V2Specification::class.java, defaultRecordsToInsert = 1_000_000, - micronautProperties = S3V2TestUtils.PERFORMANCE_TEST_MICRONAUT_PROPERTIES + micronautProperties = S3V2TestUtils.PERFORMANCE_TEST_MICRONAUT_PROPERTIES, + numFilesForFileTransfer = 5, + fileSizeMbForFileTransfer = 1024, ) { @Test - override fun testInsertRecords() { - super.testInsertRecords() - } - - @Test - override fun testRefreshingRecords() { - super.testRefreshingRecords() + override fun testFileTransfer() { + super.testFileTransfer() } } @@ -34,14 +31,4 @@ class S3V2ParquetSnappyPerformanceTest : configSpecClass = S3V2Specification::class.java, defaultRecordsToInsert = 1_000_000, micronautProperties = S3V2TestUtils.PERFORMANCE_TEST_MICRONAUT_PROPERTIES, - ) { - @Test - override fun testInsertRecords() { - super.testInsertRecords() - } - - @Test - override fun testRefreshingRecords() { - super.testRefreshingRecords() - } -} + )