Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

null safety in debezium to airbyte message generation #41622

Merged
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:-----------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.40.12 | 2024-07-10 | [\#41622](https://github.com/airbytehq/airbyte/pull/41622) | Fix null safety bug in debezium event processing |
| 0.40.11 | 2024-07-08 | [\#41041](https://github.com/airbytehq/airbyte/pull/41041) | Destinations: Fix truncate refreshes incorrectly discarding data if successful attempt had 0 records |
| 0.40.10 | 2024-07-05 | [\#40719](https://github.com/airbytehq/airbyte/pull/40719) | Update test to refrlect isResumable field in catalog |
| 0.40.9 | 2024-07-01 | [\#39473](https://github.com/airbytehq/airbyte/pull/39473) | minor changes around error logging and testing |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.40.11
version=0.40.12
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,30 @@ class RelationalDbDebeziumEventConverter(
) : DebeziumEventConverter {
override fun toAirbyteMessage(event: ChangeEventWithMetadata): AirbyteMessage {
val debeziumEvent = event.eventValueAsJson()
val before: JsonNode = debeziumEvent.get(DebeziumEventConverter.Companion.BEFORE_EVENT)
val after: JsonNode = debeziumEvent.get(DebeziumEventConverter.Companion.AFTER_EVENT)
val source: JsonNode = debeziumEvent.get(DebeziumEventConverter.Companion.SOURCE_EVENT)
val before: JsonNode? = debeziumEvent.get(DebeziumEventConverter.Companion.BEFORE_EVENT)
val after: JsonNode? = debeziumEvent.get(DebeziumEventConverter.Companion.AFTER_EVENT)
val source: JsonNode =
checkNotNull(debeziumEvent.get(DebeziumEventConverter.Companion.SOURCE_EVENT)) {
"ChangeEvent contains no source record $debeziumEvent"
}

if (listOf(before, after).all { it == null }) {
throw IllegalStateException(
"ChangeEvent contains no before or after records $debeziumEvent"
)
}
// Value of before and after may be a null or a NullNode object, representing a "null" in json
val baseNode = when (after?.isNull == true) {
true -> before
false -> after
} as ObjectNode

val baseNode = (if (after.isNull) before else after) as ObjectNode
val data: JsonNode =
DebeziumEventConverter.Companion.addCdcMetadata(
baseNode,
source,
cdcMetadataInjector,
after.isNull
after?.isNull == true
)
return DebeziumEventConverter.Companion.buildAirbyteMessage(
source,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.40.10'
cdkVersionRequired = '0.40.12'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
useLocalCdk = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this intentional or done for testing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to reply - sorry
This is the usual way of operation when making changes in CDK.
Until that CDK is published as a new version, the connector needs to consume a locally built CDK.
Once you publish a CDK we change this back to false ahead of merging.

}

application {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.4.24
dockerImageTag: 3.4.25
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Loading
Loading