Skip to content

Commit

Permalink
[Destination-S3] File Xfer Local Performance Test
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Mar 9, 2025
1 parent c11be99 commit d5b0cb5
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import io.airbyte.cdk.load.test.util.destination_process.DestinationProcess
import io.airbyte.cdk.load.test.util.destination_process.DestinationProcessFactory
import io.airbyte.protocol.models.Jsons
import io.github.oshai.kotlinlogging.KotlinLogging
import java.nio.file.Path
import java.time.Instant
import java.time.LocalDateTime
import java.time.LocalTime
Expand All @@ -40,6 +41,7 @@ import kotlinx.coroutines.runBlocking
import org.apache.commons.lang3.RandomStringUtils
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInfo
import org.junit.jupiter.api.extension.ExtendWith
Expand Down Expand Up @@ -110,6 +112,8 @@ abstract class BasicPerformanceTest(
val dataValidator: DataValidator? = null,
val micronautProperties: Map<Property, String> = emptyMap(),
namespaceOverride: String? = null,
val numFilesForFileTransfer: Int = 5,
val fileSizeMbForFileTransfer: Int = 1024,
) {

protected val destinationProcessFactory = DestinationProcessFactory.get(emptyList())
Expand Down Expand Up @@ -282,6 +286,25 @@ abstract class BasicPerformanceTest(
testAppendRecordsWithDuplicates(null)
}

@Test
@Disabled("Opt-in")
open fun testFileTransfer() {
val scenario =
SingleStreamFileTransfer(
randomizedNamespace = randomizedNamespace,
streamName = testInfo.testMethod.get().name,
numFiles = numFilesForFileTransfer,
fileSizeMb = fileSizeMbForFileTransfer,
stagingDirectory = Path.of("/tmp")
)
scenario.setup()
runSync(
testScenario = scenario,
useFileTransfer = true,
validation = null,
)
}

protected fun testAppendRecordsWithDuplicates(validation: ValidationFunction?) {
runSync(
testScenario =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@ import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema
import io.airbyte.cdk.load.message.DestinationFile
import io.airbyte.cdk.load.message.InputRecord
import io.airbyte.cdk.load.test.util.destination_process.DestinationProcess
import io.airbyte.protocol.models.Jsons
import io.github.oshai.kotlinlogging.KotlinLogging
import java.nio.file.Path
import java.security.SecureRandom
import kotlin.random.Random

/**
* Single stream performance test.
Expand Down Expand Up @@ -150,6 +155,76 @@ class SingleStreamInsert(
)
}

class SingleStreamFileTransfer(
private val randomizedNamespace: String,
private val streamName: String,
private val numFiles: Int,
private val fileSizeMb: Int,
private val stagingDirectory: Path,
private val seed: Long = 8656931613L
) : PerformanceTestScenario {
private val log = KotlinLogging.logger {}

private val descriptor = DestinationStream.Descriptor(randomizedNamespace, streamName)

override val catalog: DestinationCatalog =
DestinationCatalog(
listOf(
DestinationStream(
descriptor = descriptor,
importType = Append,
schema = ObjectTypeWithoutSchema,
generationId = 1,
minimumGenerationId = 1,
syncId = 101
)
)
)

private fun makeFileName(index: Long): String =
"test_file__${randomizedNamespace}__${streamName}__$index.txt"

fun setup() {
// TODO: Maybe make these files different sizes?
val prng = Random(seed)
val randomMegabyte = ByteArray(1024 * 1024) { prng.nextInt().toByte() }
repeat(numFiles) {
val file = stagingDirectory.resolve(makeFileName(it.toLong()))
log.info { "Creating file $file with size ${fileSizeMb}mb" }
val outputStream = file.toFile().outputStream()
repeat(fileSizeMb) { outputStream.write(randomMegabyte) }
outputStream.close()
}
}

override fun send(destination: DestinationProcess) {
repeat(numFiles) {
val fileName = makeFileName(it.toLong())
val message =
DestinationFile(
descriptor,
System.currentTimeMillis(),
"",
DestinationFile.AirbyteRecordMessageFile(
fileUrl = stagingDirectory.resolve(fileName).toString(),
fileRelativePath = fileName,
bytes = fileSizeMb * 1024 * 1024L,
modified = System.currentTimeMillis(),
sourceFileUrl = fileName,
)
)
destination.sendMessage(message.asProtocolMessage())
}
}

override fun getSummary(): PerformanceTestScenario.Summary =
PerformanceTestScenario.Summary(
records = numFiles.toLong(),
size = numFiles * fileSizeMb * 1024 * 1024L,
expectedRecordsCount = numFiles.toLong()
)
}

private fun Long.length(): Long =
if (this <= 99999) {
if (this <= 99) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,13 @@ class S3V2JsonNoFrillsPerformanceTest :
configContents = S3V2TestUtils.getConfig(S3V2TestUtils.JSON_UNCOMPRESSED_CONFIG_PATH),
configSpecClass = S3V2Specification::class.java,
defaultRecordsToInsert = 1_000_000,
micronautProperties = S3V2TestUtils.PERFORMANCE_TEST_MICRONAUT_PROPERTIES
micronautProperties = S3V2TestUtils.PERFORMANCE_TEST_MICRONAUT_PROPERTIES,
numFilesForFileTransfer = 5,
fileSizeMbForFileTransfer = 1024,
) {
@Test
override fun testInsertRecords() {
super.testInsertRecords()
}

@Test
override fun testRefreshingRecords() {
super.testRefreshingRecords()
override fun testFileTransfer() {
super.testFileTransfer()
}
}

Expand All @@ -34,14 +31,4 @@ class S3V2ParquetSnappyPerformanceTest :
configSpecClass = S3V2Specification::class.java,
defaultRecordsToInsert = 1_000_000,
micronautProperties = S3V2TestUtils.PERFORMANCE_TEST_MICRONAUT_PROPERTIES,
) {
@Test
override fun testInsertRecords() {
super.testInsertRecords()
}

@Test
override fun testRefreshingRecords() {
super.testRefreshingRecords()
}
}
)

0 comments on commit d5b0cb5

Please sign in to comment.