Skip to content

Commit fc589c6

Browse files
committed
cdk changes for mysql dv2
1 parent 6ca1a76 commit fc589c6

File tree

25 files changed

+547
-348
lines changed

25 files changed

+547
-348
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/spec_modification/SpecModifyingDestination.kt

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import io.airbyte.protocol.models.v0.ConnectorSpecification
1414
import java.util.function.Consumer
1515

1616
abstract class SpecModifyingDestination(private val destination: Destination) : Destination {
17+
override val isV2Destination: Boolean = destination.isV2Destination
18+
1719
@Throws(Exception::class)
1820
abstract fun modifySpec(originalSpec: ConnectorSpecification): ConnectorSpecification
1921

airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt

+15-15
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
*/
44
package io.airbyte.cdk.testutils
55

6-
import com.fasterxml.jackson.databind.JsonNode
6+
import com.fasterxml.jackson.databind.node.ObjectNode
77
import com.google.common.collect.ImmutableMap
88
import io.airbyte.cdk.db.ContextQueryFunction
99
import io.airbyte.cdk.db.Database
@@ -55,16 +55,16 @@ protected constructor(val container: C) : AutoCloseable {
5555
@JvmField protected val databaseId: Int = nextDatabaseId.getAndIncrement()
5656
@JvmField
5757
protected val containerId: Int =
58-
containerUidToId!!.computeIfAbsent(container.containerId) { _: String? ->
59-
nextContainerId!!.getAndIncrement()
58+
containerUidToId.computeIfAbsent(container.containerId) { _: String? ->
59+
nextContainerId.getAndIncrement()
6060
}!!
6161
private val dateFormat: DateFormat = SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
6262

6363
init {
64-
LOGGER!!.info(formatLogLine("creating database " + databaseName))
64+
LOGGER!!.info(formatLogLine("creating database $databaseName"))
6565
}
6666

67-
protected fun formatLogLine(logLine: String?): String? {
67+
protected fun formatLogLine(logLine: String?): String {
6868
val retVal = "TestDatabase databaseId=$databaseId, containerId=$containerId - $logLine"
6969
return retVal
7070
}
@@ -100,7 +100,7 @@ protected constructor(val container: C) : AutoCloseable {
100100
* object. This typically entails at least a CREATE DATABASE and a CREATE USER. Also Initializes
101101
* the [DataSource] and [DSLContext] owned by this object.
102102
*/
103-
open fun initialized(): T? {
103+
open fun initialized(): T {
104104
inContainerBootstrapCmd().forEach { cmds: Stream<String> -> this.execInContainer(cmds) }
105105
this.dataSource =
106106
DataSourceFactory.create(
@@ -165,12 +165,12 @@ protected constructor(val container: C) : AutoCloseable {
165165
databaseName
166166
)
167167

168-
val database: Database?
168+
val database: Database
169169
get() = Database(getDslContext())
170170

171171
protected fun execSQL(sql: Stream<String>) {
172172
try {
173-
database!!.query<Any?> { ctx: DSLContext? ->
173+
database.query<Any?> { ctx: DSLContext? ->
174174
sql.forEach { statement: String? ->
175175
LOGGER!!.info("executing SQL statement {}", statement)
176176
ctx!!.execute(statement)
@@ -228,12 +228,12 @@ protected constructor(val container: C) : AutoCloseable {
228228

229229
@Throws(SQLException::class)
230230
fun <X> query(transform: ContextQueryFunction<X>): X? {
231-
return database!!.query(transform)
231+
return database.query(transform)
232232
}
233233

234234
@Throws(SQLException::class)
235235
fun <X> transaction(transform: ContextQueryFunction<X>): X? {
236-
return database!!.transaction(transform)
236+
return database.transaction(transform)
237237
}
238238

239239
/** Returns a builder for the connector config object. */
@@ -245,7 +245,7 @@ protected constructor(val container: C) : AutoCloseable {
245245
return configBuilder().withHostAndPort().withCredentials().withDatabase()
246246
}
247247

248-
fun integrationTestConfigBuilder(): B? {
248+
fun integrationTestConfigBuilder(): B {
249249
return configBuilder().withResolvedHostAndPort().withCredentials().withDatabase()
250250
}
251251

@@ -260,8 +260,8 @@ protected constructor(val container: C) : AutoCloseable {
260260
) {
261261
protected val builder: ImmutableMap.Builder<Any, Any> = ImmutableMap.builder()
262262

263-
fun build(): JsonNode {
264-
return Jsons.jsonNode(builder.build())
263+
fun build(): ObjectNode {
264+
return Jsons.jsonNode(builder.build()) as ObjectNode
265265
}
266266

267267
@Suppress("UNCHECKED_CAST")
@@ -314,7 +314,7 @@ protected constructor(val container: C) : AutoCloseable {
314314

315315
private val nextDatabaseId: AtomicInteger = AtomicInteger(0)
316316

317-
private val nextContainerId: AtomicInteger? = AtomicInteger(0)
318-
private val containerUidToId: MutableMap<String?, Int?>? = ConcurrentHashMap()
317+
private val nextContainerId: AtomicInteger = AtomicInteger(0)
318+
private val containerUidToId: MutableMap<String?, Int?> = ConcurrentHashMap()
319319
}
320320
}

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt

+9-4
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
6565
namingResolver,
6666
sqlOperations
6767
)
68-
protected val configSchemaKey: String
69-
get() = "schema"
68+
protected open val configSchemaKey: String = "schema"
7069

7170
/**
7271
* If the destination should always disable type dedupe, override this method to return true. We
@@ -196,6 +195,11 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
196195
rawTableSchema: String
197196
): JdbcDestinationHandler<DestinationState>
198197

198+
protected open fun getV1V2Migrator(
199+
database: JdbcDatabase,
200+
databaseName: String
201+
): DestinationV1V2Migrator = JdbcV1V2Migrator(namingResolver, database, databaseName)
202+
199203
/**
200204
* Provide any migrations that the destination needs to run. Most destinations will need to
201205
* provide an instande of
@@ -306,6 +310,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
306310
typerDeduper,
307311
getDataTransformer(parsedCatalog, defaultNamespace),
308312
optimalBatchSizeBytes,
313+
parsedCatalog,
309314
)
310315
}
311316

@@ -317,7 +322,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
317322
val sqlGenerator = getSqlGenerator(config)
318323
val databaseName = getDatabaseName(config)
319324
val v2TableMigrator = NoopV2TableMigrator()
320-
val migrator = JdbcV1V2Migrator(namingResolver, database, databaseName)
325+
val migrator = getV1V2Migrator(database, databaseName)
321326
val destinationHandler: DestinationHandler<DestinationState> =
322327
getDestinationHandler(
323328
databaseName,
@@ -434,7 +439,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
434439
if (attemptInsert) {
435440
sqlOps.insertRecords(
436441
database,
437-
java.util.List.of(dummyRecord),
442+
listOf(dummyRecord),
438443
outputSchema,
439444
outputTableName,
440445
)

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.kt

+52-10
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseF
2424
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction
2525
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.RecordWriter
2626
import io.airbyte.commons.json.Jsons
27+
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
28+
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
2729
import io.airbyte.integrations.base.destination.typing_deduping.StreamId.Companion.concatenateRawTableName
2830
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper
2931
import io.airbyte.protocol.models.v0.*
@@ -54,6 +56,7 @@ object JdbcBufferedConsumerFactory {
5456

5557
const val DEFAULT_OPTIMAL_BATCH_SIZE_FOR_FLUSH = 25 * 1024 * 1024L
5658

59+
/** @param parsedCatalog Nullable for v1 destinations. Required for v2 destinations. */
5760
fun createAsync(
5861
outputRecordCollector: Consumer<AirbyteMessage>,
5962
database: JdbcDatabase,
@@ -65,9 +68,16 @@ object JdbcBufferedConsumerFactory {
6568
typerDeduper: TyperDeduper,
6669
dataTransformer: StreamAwareDataTransformer = IdentityDataTransformer(),
6770
optimalBatchSizeBytes: Long = DEFAULT_OPTIMAL_BATCH_SIZE_FOR_FLUSH,
71+
parsedCatalog: ParsedCatalog? = null
6872
): SerializedAirbyteMessageConsumer {
6973
val writeConfigs =
70-
createWriteConfigs(namingResolver, config, catalog, sqlOperations.isSchemaRequired)
74+
createWriteConfigs(
75+
namingResolver,
76+
config,
77+
catalog,
78+
sqlOperations.isSchemaRequired,
79+
parsedCatalog
80+
)
7181
return AsyncStreamConsumer(
7282
outputRecordCollector,
7383
onStartFunction(database, sqlOperations, writeConfigs, typerDeduper),
@@ -89,19 +99,28 @@ object JdbcBufferedConsumerFactory {
8999
namingResolver: NamingConventionTransformer,
90100
config: JsonNode,
91101
catalog: ConfiguredAirbyteCatalog?,
92-
schemaRequired: Boolean
102+
schemaRequired: Boolean,
103+
parsedCatalog: ParsedCatalog?
93104
): List<WriteConfig> {
94105
if (schemaRequired) {
95106
Preconditions.checkState(
96107
config.has("schema"),
97108
"jdbc destinations must specify a schema."
98109
)
99110
}
100-
return catalog!!
101-
.streams
102-
.stream()
103-
.map(toWriteConfig(namingResolver, config, schemaRequired))
104-
.collect(Collectors.toList())
111+
return if (parsedCatalog == null) {
112+
catalog!!
113+
.streams
114+
.stream()
115+
.map(toWriteConfig(namingResolver, config, schemaRequired))
116+
.collect(Collectors.toList())
117+
} else {
118+
// we should switch this to kotlin-style list processing, but meh for now
119+
parsedCatalog.streams
120+
.stream()
121+
.map(parsedStreamToWriteConfig(namingResolver))
122+
.collect(Collectors.toList())
123+
}
105124
}
106125

107126
private fun toWriteConfig(
@@ -150,6 +169,27 @@ object JdbcBufferedConsumerFactory {
150169
}
151170
}
152171

172+
private fun parsedStreamToWriteConfig(
173+
namingResolver: NamingConventionTransformer
174+
): Function<StreamConfig, WriteConfig> {
175+
return Function { streamConfig: StreamConfig ->
176+
// TODO We should probably replace WriteConfig with StreamConfig?
177+
// The only thing I'm not sure about is the tmpTableName thing,
178+
// but otherwise it's a strict improvement (avoids people accidentally
179+
// recomputing the table names, instead of just treating the output of
180+
// CatalogParser as canonical).
181+
WriteConfig(
182+
streamConfig.id.originalName,
183+
streamConfig.id.originalNamespace,
184+
streamConfig.id.rawNamespace,
185+
@Suppress("deprecation")
186+
namingResolver.getTmpTableName(streamConfig.id.rawNamespace),
187+
streamConfig.id.rawName,
188+
streamConfig.destinationSyncMode,
189+
)
190+
}
191+
}
192+
153193
/**
154194
* Defer to the [AirbyteStream]'s namespace. If this is not set, use the destination's default
155195
* schema. This namespace is source-provided, and can be potentially empty.
@@ -160,7 +200,7 @@ object JdbcBufferedConsumerFactory {
160200
private fun getOutputSchema(
161201
stream: AirbyteStream,
162202
defaultDestSchema: String,
163-
namingResolver: NamingConventionTransformer
203+
namingResolver: NamingConventionTransformer,
164204
): String {
165205
return if (isDestinationV2) {
166206
namingResolver.getNamespace(
@@ -252,8 +292,10 @@ object JdbcBufferedConsumerFactory {
252292
records: List<PartialAirbyteMessage> ->
253293
require(pairToWriteConfig.containsKey(pair)) {
254294
String.format(
255-
"Message contained record from a stream that was not in the catalog. \ncatalog: %s",
256-
Jsons.serialize(catalog)
295+
"Message contained record from a stream that was not in the catalog. \ncatalog: %s, \nstream identifier: %s\nkeys: %s",
296+
Jsons.serialize(catalog),
297+
pair,
298+
pairToWriteConfig.keys
257299
)
258300
}
259301
val writeConfig = pairToWriteConfig.getValue(pair)

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,12 @@ abstract class JdbcSqlOperations : SqlOperations {
144144
val uuid = UUID.randomUUID().toString()
145145

146146
val jsonData = record.serialized
147-
val airbyteMeta = Jsons.serialize(record.record!!.meta)
147+
val airbyteMeta =
148+
if (record.record!!.meta == null) {
149+
"{\"changes\":[]}"
150+
} else {
151+
Jsons.serialize(record.record!!.meta)
152+
}
148153
val extractedAt =
149154
Timestamp.from(Instant.ofEpochMilli(record.record!!.emittedAt))
150155
if (isDestinationV2) {

0 commit comments

Comments
 (0)