Skip to content

Commit a8edb04

Browse files
committed
cdk changes for mysql dv2
1 parent 43ecebb commit a8edb04

File tree

20 files changed

+442
-305
lines changed

20 files changed

+442
-305
lines changed

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt

+8-2
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,11 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
196196
rawTableSchema: String
197197
): JdbcDestinationHandler<DestinationState>
198198

199+
protected open fun getV1V2Migrator(
200+
database: JdbcDatabase,
201+
databaseName: String
202+
): DestinationV1V2Migrator = JdbcV1V2Migrator(namingResolver, database, databaseName)
203+
199204
/**
200205
* Provide any migrations that the destination needs to run. Most destinations will need to
201206
* provide an instande of
@@ -306,6 +311,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
306311
typerDeduper,
307312
getDataTransformer(parsedCatalog, defaultNamespace),
308313
optimalBatchSizeBytes,
314+
parsedCatalog,
309315
)
310316
}
311317

@@ -317,7 +323,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
317323
val sqlGenerator = getSqlGenerator(config)
318324
val databaseName = getDatabaseName(config)
319325
val v2TableMigrator = NoopV2TableMigrator()
320-
val migrator = JdbcV1V2Migrator(namingResolver, database, databaseName)
326+
val migrator = getV1V2Migrator(database, databaseName)
321327
val destinationHandler: DestinationHandler<DestinationState> =
322328
getDestinationHandler(
323329
databaseName,
@@ -434,7 +440,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
434440
if (attemptInsert) {
435441
sqlOps.insertRecords(
436442
database,
437-
java.util.List.of(dummyRecord),
443+
listOf(dummyRecord),
438444
outputSchema,
439445
outputTableName,
440446
)

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.kt

+47-10
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseF
2424
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction
2525
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.RecordWriter
2626
import io.airbyte.commons.json.Jsons
27+
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
28+
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
2729
import io.airbyte.integrations.base.destination.typing_deduping.StreamId.Companion.concatenateRawTableName
2830
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper
2931
import io.airbyte.protocol.models.v0.*
@@ -54,6 +56,9 @@ object JdbcBufferedConsumerFactory {
5456

5557
const val DEFAULT_OPTIMAL_BATCH_SIZE_FOR_FLUSH = 25 * 1024 * 1024L
5658

59+
/**
60+
* @param parsedCatalog Nullable for v1 destinations. Required for v2 destinations.
61+
*/
5762
fun createAsync(
5863
outputRecordCollector: Consumer<AirbyteMessage>,
5964
database: JdbcDatabase,
@@ -65,9 +70,10 @@ object JdbcBufferedConsumerFactory {
6570
typerDeduper: TyperDeduper,
6671
dataTransformer: StreamAwareDataTransformer = IdentityDataTransformer(),
6772
optimalBatchSizeBytes: Long = DEFAULT_OPTIMAL_BATCH_SIZE_FOR_FLUSH,
73+
parsedCatalog: ParsedCatalog? = null
6874
): SerializedAirbyteMessageConsumer {
6975
val writeConfigs =
70-
createWriteConfigs(namingResolver, config, catalog, sqlOperations.isSchemaRequired)
76+
createWriteConfigs(namingResolver, config, catalog, sqlOperations.isSchemaRequired, parsedCatalog)
7177
return AsyncStreamConsumer(
7278
outputRecordCollector,
7379
onStartFunction(database, sqlOperations, writeConfigs, typerDeduper),
@@ -89,19 +95,28 @@ object JdbcBufferedConsumerFactory {
8995
namingResolver: NamingConventionTransformer,
9096
config: JsonNode,
9197
catalog: ConfiguredAirbyteCatalog?,
92-
schemaRequired: Boolean
98+
schemaRequired: Boolean,
99+
parsedCatalog: ParsedCatalog?
93100
): List<WriteConfig> {
94101
if (schemaRequired) {
95102
Preconditions.checkState(
96103
config.has("schema"),
97104
"jdbc destinations must specify a schema."
98105
)
99106
}
100-
return catalog!!
101-
.streams
102-
.stream()
103-
.map(toWriteConfig(namingResolver, config, schemaRequired))
104-
.collect(Collectors.toList())
107+
return if (parsedCatalog == null) {
108+
catalog!!
109+
.streams
110+
.stream()
111+
.map(toWriteConfig(namingResolver, config, schemaRequired))
112+
.collect(Collectors.toList())
113+
} else {
114+
// we should switch this to kotlin-style list processing, but meh for now
115+
parsedCatalog.streams
116+
.stream()
117+
.map(parsedStreamToWriteConfig(namingResolver))
118+
.collect(Collectors.toList())
119+
}
105120
}
106121

107122
private fun toWriteConfig(
@@ -150,6 +165,26 @@ object JdbcBufferedConsumerFactory {
150165
}
151166
}
152167

168+
private fun parsedStreamToWriteConfig(
169+
namingResolver: NamingConventionTransformer
170+
): Function<StreamConfig, WriteConfig> {
171+
return Function { streamConfig: StreamConfig ->
172+
// TODO We should probably replace WriteConfig with StreamConfig?
173+
// The only thing I'm not sure about is the tmpTableName thing,
174+
// but otherwise it's a strict improvement (avoids people accidentally
175+
// recomputing the table names, instead of just treating the output of
176+
// CatalogParser as canonical).
177+
WriteConfig(
178+
streamConfig.id.originalName,
179+
streamConfig.id.originalNamespace,
180+
streamConfig.id.rawNamespace,
181+
namingResolver.getTmpTableName(streamConfig.id.rawNamespace),
182+
streamConfig.id.rawName,
183+
streamConfig.destinationSyncMode,
184+
)
185+
}
186+
}
187+
153188
/**
154189
* Defer to the [AirbyteStream]'s namespace. If this is not set, use the destination's default
155190
* schema. This namespace is source-provided, and can be potentially empty.
@@ -160,7 +195,7 @@ object JdbcBufferedConsumerFactory {
160195
private fun getOutputSchema(
161196
stream: AirbyteStream,
162197
defaultDestSchema: String,
163-
namingResolver: NamingConventionTransformer
198+
namingResolver: NamingConventionTransformer,
164199
): String {
165200
return if (isDestinationV2) {
166201
namingResolver.getNamespace(
@@ -252,8 +287,10 @@ object JdbcBufferedConsumerFactory {
252287
records: List<PartialAirbyteMessage> ->
253288
require(pairToWriteConfig.containsKey(pair)) {
254289
String.format(
255-
"Message contained record from a stream that was not in the catalog. \ncatalog: %s",
256-
Jsons.serialize(catalog)
290+
"Message contained record from a stream that was not in the catalog. \ncatalog: %s, \nstream identifier: %s\nkeys: %s",
291+
Jsons.serialize(catalog),
292+
pair,
293+
pairToWriteConfig.keys
257294
)
258295
}
259296
val writeConfig = pairToWriteConfig.getValue(pair)

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.kt

+5-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,11 @@ abstract class JdbcSqlOperations : SqlOperations {
142142
val uuid = UUID.randomUUID().toString()
143143

144144
val jsonData = record.serialized
145-
val airbyteMeta = Jsons.serialize(record.record!!.meta)
145+
val airbyteMeta = if (record.record!!.meta == null) {
146+
"{\"changes\":[]}"
147+
} else {
148+
Jsons.serialize(record.record!!.meta)
149+
}
146150
val extractedAt =
147151
Timestamp.from(Instant.ofEpochMilli(record.record!!.emittedAt))
148152
if (isDestinationV2) {

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt

+32-23
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,11 @@ import org.slf4j.LoggerFactory
4141

4242
@Slf4j
4343
abstract class JdbcDestinationHandler<DestinationState>(
44-
protected val databaseName: String,
44+
// This feels hacky. MySQL doesn't have a notion of "schema".
45+
// In this case, the JDBC interface translates the schema parameter to mysql database.
46+
// This allows us to pass e.g. stream namespace into metadata.get(schema=?) for mysql.
47+
// So it's misleading, but mysql will use a null databaseName here to avoid problemss.
48+
protected val databaseName: String?,
4549
protected val jdbcDatabase: JdbcDatabase,
4650
protected val rawTableSchemaName: String,
4751
private val dialect: SQLDialect
@@ -50,10 +54,13 @@ abstract class JdbcDestinationHandler<DestinationState>(
5054
get() = DSL.using(dialect)
5155

5256
@Throws(Exception::class)
53-
private fun findExistingTable(id: StreamId): Optional<TableDefinition> {
57+
protected open fun findExistingTable(id: StreamId): Optional<TableDefinition> {
5458
return findExistingTable(jdbcDatabase, databaseName, id.finalNamespace, id.finalName)
5559
}
5660

61+
protected open fun getTableFromMetadata(dbmetadata: DatabaseMetaData, id: StreamId): ResultSet =
62+
dbmetadata.getTables(databaseName, id.rawNamespace, id.rawName, null)
63+
5764
@Throws(Exception::class)
5865
private fun isFinalTableEmpty(id: StreamId): Boolean {
5966
return !jdbcDatabase.queryBoolean(
@@ -80,9 +87,8 @@ abstract class JdbcDestinationHandler<DestinationState>(
8087
id.rawName
8188
)
8289
try {
83-
dbmetadata!!.getTables(databaseName, id.rawNamespace, id.rawName, null).use {
84-
table ->
85-
return@executeMetadataQuery table.next()
90+
getTableFromMetadata(dbmetadata!!, id).use {
91+
table -> return@executeMetadataQuery table.next()
8692
}
8793
} catch (e: SQLException) {
8894
LOGGER.error("Failed to retrieve table info from metadata", e)
@@ -340,22 +346,25 @@ abstract class JdbcDestinationHandler<DestinationState>(
340346
}
341347
}
342348

343-
private fun isAirbyteRawIdColumnMatch(existingTable: TableDefinition): Boolean {
344-
return existingTable.columns.containsKey(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID) &&
345-
toJdbcTypeName(AirbyteProtocolType.STRING) ==
346-
existingTable.columns[JavaBaseConstants.COLUMN_NAME_AB_RAW_ID]!!.type
349+
protected open fun isAirbyteRawIdColumnMatch(existingTable: TableDefinition): Boolean {
350+
return toJdbcTypeName(AirbyteProtocolType.STRING).equals(
351+
existingTable.columns[JavaBaseConstants.COLUMN_NAME_AB_RAW_ID]!!.type,
352+
ignoreCase = true,
353+
)
347354
}
348355

349-
private fun isAirbyteExtractedAtColumnMatch(existingTable: TableDefinition): Boolean {
350-
return existingTable.columns.containsKey(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT) &&
351-
toJdbcTypeName(AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE) ==
352-
existingTable.columns[JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT]!!.type
356+
protected open fun isAirbyteExtractedAtColumnMatch(existingTable: TableDefinition): Boolean {
357+
return toJdbcTypeName(AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE).equals(
358+
existingTable.columns[JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT]!!.type,
359+
ignoreCase = true,
360+
)
353361
}
354362

355-
private fun isAirbyteMetaColumnMatch(existingTable: TableDefinition): Boolean {
356-
return existingTable.columns.containsKey(JavaBaseConstants.COLUMN_NAME_AB_META) &&
357-
toJdbcTypeName(Struct(LinkedHashMap<String, AirbyteType>())) ==
358-
existingTable.columns[JavaBaseConstants.COLUMN_NAME_AB_META]!!.type
363+
protected open fun isAirbyteMetaColumnMatch(existingTable: TableDefinition): Boolean {
364+
return toJdbcTypeName(Struct(LinkedHashMap<String, AirbyteType>())).equals(
365+
existingTable.columns[JavaBaseConstants.COLUMN_NAME_AB_META]!!.type,
366+
ignoreCase = true,
367+
)
359368
}
360369

361370
private fun existingSchemaMatchesStreamConfig(
@@ -364,9 +373,9 @@ abstract class JdbcDestinationHandler<DestinationState>(
364373
): Boolean {
365374
// Check that the columns match, with special handling for the metadata columns.
366375
if (
367-
!isAirbyteRawIdColumnMatch(existingTable) ||
368-
!isAirbyteExtractedAtColumnMatch(existingTable) ||
369-
!isAirbyteMetaColumnMatch(existingTable)
376+
!(existingTable.columns.containsKey(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID) && isAirbyteRawIdColumnMatch(existingTable)) ||
377+
!(existingTable.columns.containsKey(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT) && isAirbyteExtractedAtColumnMatch(existingTable)) ||
378+
!(existingTable.columns.containsKey(JavaBaseConstants.COLUMN_NAME_AB_META) && isAirbyteMetaColumnMatch(existingTable))
370379
) {
371380
// Missing AB meta columns from final table, we need them to do proper T+D so trigger
372381
// soft-reset
@@ -392,7 +401,7 @@ abstract class JdbcDestinationHandler<DestinationState>(
392401
{
393402
map: LinkedHashMap<String?, String>,
394403
column: Map.Entry<String?, ColumnDefinition> ->
395-
map[column.key] = column.value.type
404+
map[column.key] = column.value.type.lowercase()
396405
},
397406
{ obj: LinkedHashMap<String?, String>, m: LinkedHashMap<String?, String>? ->
398407
obj.putAll(m!!)
@@ -475,9 +484,9 @@ abstract class JdbcDestinationHandler<DestinationState>(
475484
* @param airbyteType
476485
* @return
477486
*/
478-
protected abstract fun toJdbcTypeName(airbyteType: AirbyteType?): String
487+
protected abstract fun toJdbcTypeName(airbyteType: AirbyteType): String
479488

480-
protected abstract fun toDestinationState(json: JsonNode?): DestinationState
489+
protected abstract fun toDestinationState(json: JsonNode): DestinationState
481490

482491
companion object {
483492
private val LOGGER: Logger = LoggerFactory.getLogger(JdbcDestinationHandler::class.java)

0 commit comments

Comments
 (0)