Skip to content

Commit

Permalink
WIP: Load CDK: New Interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Feb 18, 2025
1 parent 3f3fd6f commit 58b8266
Show file tree
Hide file tree
Showing 40 changed files with 1,146 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ data class DestinationCatalog(val streams: List<DestinationStream> = emptyList()
?: throw IllegalArgumentException("Stream not found: namespace=$namespace, name=$name")
}

fun getStream(descriptor: DestinationStream.Descriptor): DestinationStream {
return byDescriptor[descriptor]
?: throw IllegalArgumentException("Stream not found: $descriptor")
}

fun asProtocolObject(): ConfiguredAirbyteCatalog =
ConfiguredAirbyteCatalog().withStreams(streams.map { it.asProtocolObject() })

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,5 @@ class SyncBeanFactory {

@Singleton
@Named("openStreamQueue")
class OpenStreamQueue : ChannelMessageQueue<DestinationStream>()
class OpenStreamQueue : ChannelMessageQueue<DestinationStream>(Channel(Channel.UNLIMITED))
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ data class DestinationRecord(
Meta(
message.record.meta?.changes?.map { Meta.Change(it.field, it.change, it.reason) }
?: emptyList()
)
),
serialized.length.toLong()
)
}
}
Expand All @@ -159,6 +160,7 @@ data class DestinationRecordAirbyteValue(
val data: AirbyteValue,
val emittedAtMs: Long,
val meta: Meta?,
val serializedSizeBytes: Long
)

data class DestinationFile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import io.airbyte.cdk.load.state.Reserved
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import java.util.concurrent.ConcurrentHashMap
import kotlinx.coroutines.channels.Channel

interface Sized {
val sizeBytes: Long
Expand Down Expand Up @@ -52,7 +53,9 @@ data class StreamFlushEvent(
override val sizeBytes: Long = 0L
}

class DestinationStreamEventQueue : ChannelMessageQueue<Reserved<DestinationStreamEvent>>()
class DestinationStreamEventQueue : ChannelMessageQueue<Reserved<DestinationStreamEvent>>(
Channel(Channel.UNLIMITED)
)

/**
* A supplier of message queues to which ([ReservationManager.reserve]'d) @ [DestinationStreamEvent]
Expand Down Expand Up @@ -97,4 +100,6 @@ data class GlobalCheckpointWrapped(
*/
@Singleton
@Secondary
class CheckpointMessageQueue : ChannelMessageQueue<Reserved<CheckpointMessageWrapped>>()
class CheckpointMessageQueue : ChannelMessageQueue<Reserved<CheckpointMessageWrapped>>(
Channel(Channel.UNLIMITED)
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.receiveAsFlow

interface QueueReader<T> {
suspend fun consume(): Flow<T>
fun consume(): Flow<T>
suspend fun poll(): T?
}

Expand All @@ -23,12 +23,13 @@ interface QueueWriter<T> : CloseableCoroutine {

interface MessageQueue<T> : QueueReader<T>, QueueWriter<T>

abstract class ChannelMessageQueue<T> : MessageQueue<T> {
open val channel = Channel<T>(Channel.UNLIMITED)
open class ChannelMessageQueue<T>(
val channel: Channel<T>
) : MessageQueue<T> {
private val isClosed = AtomicBoolean(false)

override suspend fun publish(message: T) = channel.send(message)
override suspend fun consume(): Flow<T> = channel.receiveAsFlow()
override fun consume(): Flow<T> = channel.receiveAsFlow()
override suspend fun poll(): T? = channel.tryReceive().getOrNull()
override suspend fun close() {
if (isClosed.setOnce()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import kotlinx.coroutines.channels.Channel
*/
class MultiProducerChannel<T>(
producerCount: Long,
override val channel: Channel<T>,
channel: Channel<T>,
private val name: String,
) : ChannelMessageQueue<T>() {
) : ChannelMessageQueue<T>(channel = channel) {
private val log = KotlinLogging.logger {}
private val initializedProducerCount = producerCount
private val producerCount = AtomicLong(producerCount)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.message

import io.airbyte.cdk.load.util.CloseableCoroutine
import java.util.concurrent.ConcurrentLinkedQueue
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.consumeAsFlow

class PartitionedQueue<T>(
private val queues: Array<MessageQueue<T>>
) : CloseableCoroutine {
val partitions = queues.size

fun consume(partition: Int): Flow<T> {
if (partition < 0 || partition >= queues.size) {
throw IllegalArgumentException("Invalid partition: $partition")
}
return queues[partition].consume()
}

suspend fun publish(value: T, partition: Int) {
if (partition < 0 || partition >= queues.size) {
throw IllegalArgumentException("Invalid partition: $partition")
}
queues[partition].publish(value)
}

suspend fun broadcast(value: T) = queues.forEach { it.publish(value) }

override suspend fun close() {
queues.forEach { it.close() }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.message

import com.google.common.collect.RangeSet
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.state.CheckpointId

/** Used internally by the CDK to pass messages between steps in the loader pipeline. */
sealed interface PipelineEvent<K : WithStream, T>

class PipelineMessage<K : WithStream, T>(
val checkpointCounts: Map<CheckpointId, Long>,
val key: K,
val value: T
) :
PipelineEvent<K, T>

/**
* We send the end message on the stream and not the key, because there's no way to partition an
* empty message.
*/
class PipelineEndOfStream<K : WithStream, T>(val stream: DestinationStream.Descriptor) :
PipelineEvent<K, T>
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.message

/**
* Used internally by the CDK to implement Loaders. It is added to outputs of
* [io.airbyte.cdk.load.pipeline.BatchAccumulator]
* that can ack or complete record batches. This is done *when stitching the dev
* interface to the pipeline*, so the dev does not have to think about internal state.
*/
interface WithBatchState {
val state: Batch.State
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.message

import io.airbyte.cdk.load.command.DestinationStream

/**
* Used internally by the CDK to keep track of streams while still allowing for partitioning on key.
*/
interface WithStream {
val stream: DestinationStream.Descriptor
}

/** The default key: partitioned by stream. */
data class StreamKey(override val stream: DestinationStream.Descriptor) : WithStream
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.pipeline

import io.airbyte.cdk.load.message.WithStream

/**
* [BatchAccumulator] is used internally by the CDK to implement RecordLoaders. Connector devs
* should never need to implement this interface.
*/
interface BatchAccumulator<K : WithStream, S, T, U> {
fun start(key: K, part: Int): S
fun accept(record: T, state: S): Pair<S, U?>
fun finish(state: S): U
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.pipeline

import com.google.common.collect.RangeSet
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.state.CheckpointId

/** Used internally by the CDK to track record ranges to ack. */
sealed interface BatchUpdate {
val stream: DestinationStream.Descriptor
}

data class BatchStateUpdate(
override val stream: DestinationStream.Descriptor,
val checkpointCounts: Map<CheckpointId, Long>,
val state: Batch.State,
) : BatchUpdate

data class BatchEndOfStream(
override val stream: DestinationStream.Descriptor,
) : BatchUpdate
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.pipeline

import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.airbyte.cdk.load.message.PartitionedQueue
import io.airbyte.cdk.load.message.PipelineEvent
import io.airbyte.cdk.load.message.QueueWriter
import io.airbyte.cdk.load.message.WithStream
import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask
import io.airbyte.cdk.load.write.DirectLoader
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Value
import jakarta.inject.Named
import jakarta.inject.Singleton

/**
* Used internally by the CDK to implement the DirectLoader.
*
* Creates a single pipeline step reading from a (possibly partitioned) record stream. Batch updates
* are written to the batchStateUpdateQueue whenever the loader returns
*/
@Singleton
@Requires(property = "airbyte.destination.core.load-pipeline.strategy", value = "direct")
class DirectLoadPipeline(
val pipelineStep: DirectLoadPipelineStep<*, *>
) : LoadPipeline(listOf(pipelineStep))
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.airbyte.cdk.load.pipeline

import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.airbyte.cdk.load.message.PartitionedQueue
import io.airbyte.cdk.load.message.PipelineEvent
import io.airbyte.cdk.load.message.QueueWriter
import io.airbyte.cdk.load.message.WithStream
import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask
import io.airbyte.cdk.load.write.DirectLoader
import io.micronaut.context.annotation.Value
import jakarta.inject.Named
import jakarta.inject.Singleton

@Singleton
class DirectLoadPipelineStep<K: WithStream, S: DirectLoader>(
val accumulator: DirectLoadRecordAccumulator<K, S>,
@Named("recordQueue")
val inputQueue: PartitionedQueue<PipelineEvent<K, DestinationRecordAirbyteValue>>,
@Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
@Value("\${airbyte.destination.core.record-batch-size-override:null}") val batchSizeOverride: Long? = null,
@Value("\${airbyte.destination.core.load-pipeline.input-parts:1}")
override val numWorkers: Int,
): LoadPipelineStep {
override fun taskForPartition(partition: Int): LoadPipelineStepTask<*, *, *, *, *> {
return LoadPipelineStepTask(
accumulator,
inputQueue.consume(partition),
batchUpdateQueue = batchQueue,
outputPartitioner = NoopPartitioner(),
null,
batchSizeOverride?.let { RecordCountFlushStrategy(it) },
partition
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.pipeline

import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.airbyte.cdk.load.message.WithBatchState
import io.airbyte.cdk.load.message.WithStream
import io.airbyte.cdk.load.write.DirectLoader
import io.airbyte.cdk.load.write.DirectLoader.*
import io.airbyte.cdk.load.write.DirectLoaderFactory
import jakarta.inject.Singleton

data class DirectLoadAccResult(override val state: Batch.State) : WithBatchState

/**
* Used internally by the CDK to wrap the client-provided DirectLoader in a generic
* BatchAccumulator, so that it can be used as a pipeline step. At this stage, the loader's public
* interface is mapped to the internal interface, hiding internal mechanics.
*/
@Singleton
class DirectLoadRecordAccumulator<K : WithStream, S : DirectLoader>(
val directLoaderFactory: DirectLoaderFactory<S>
) : BatchAccumulator<K, DirectLoader, DestinationRecordAirbyteValue, DirectLoadAccResult> {
override fun start(key: K, part: Int): DirectLoader {
return directLoaderFactory.create(key.stream, part)
}

override fun accept(
record: DestinationRecordAirbyteValue,
state: DirectLoader
): Pair<DirectLoader, DirectLoadAccResult?> {
state.accept(record).let {
return when (it) {
is Incomplete -> Pair(state, null)
is Complete -> Pair(state, DirectLoadAccResult(Batch.State.COMPLETE))
}
}
}

override fun finish(state: DirectLoader): DirectLoadAccResult {
state.finish()
return DirectLoadAccResult(Batch.State.COMPLETE)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.pipeline

import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import kotlin.math.abs

/**
* A dev interface for expressing how incoming data is partitioned. By default, data will be
* partitioned by a hash of the stream name and namespace.
*/
interface InputPartitioner {
fun getPartition(record: DestinationRecordAirbyteValue, numParts: Int): Int
}

@Singleton
@Secondary
class ByStreamInputPartitioner : InputPartitioner {
override fun getPartition(record: DestinationRecordAirbyteValue, numParts: Int): Int {
return abs(record.stream.hashCode()) % numParts
}
}
Loading

0 comments on commit 58b8266

Please sign in to comment.