Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Source-postgres] : Compare each record’s lsn_commit value instead of lsn_proc. #35939

Merged
merged 15 commits into from
Mar 13, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.3.16
dockerImageTag: 3.3.17
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

package io.airbyte.integrations.source.postgres.cdc;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.cdk.db.PgLsn;
import io.airbyte.cdk.db.PostgresUtils;
Expand All @@ -14,6 +17,7 @@
import io.airbyte.cdk.integrations.debezium.internals.SnapshotMetadata;
import io.airbyte.commons.json.Jsons;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -105,10 +109,44 @@ public boolean isEventAheadOffset(final Map<String, String> offset, final Change

final JsonNode offsetJson = Jsons.deserialize((String) offset.values().toArray()[0]);

final String offset_lsn =
offsetJson.get("lsn_commit") != null ? String.valueOf(offsetJson.get("lsn_commit")) : String.valueOf(offsetJson.get("lsn"));
final String event_lsn = String.valueOf(event.eventValueAsJson().get("source").get("lsn"));
return Long.parseLong(event_lsn) > Long.parseLong(offset_lsn);
final String stateOffsetLsnCommit = offsetJson.get("lsn_commit") != null ? String.valueOf(offsetJson.get("lsn_commit")) : null;
if (stateOffsetLsnCommit == null) {
return false;
}

try {
ObjectMapper objectMapper = new ObjectMapper();
TypeReference<List<String>> listType = new TypeReference<>() {};
/* The event source structure is :
{
"version":"2.4.0.Final",
"connector":"postgresql",
"name":"db_pkgzzfnybb",
"ts_ms":1710283178042,
"snapshot":"false",
"db":"db_pkgzzfnybb",
"sequence":"[\"30660608\",\"30660608\"]",
"schema":"models_schema",
"table":"models",
"txId":777,
"lsn":30660608,
"xmin":null
}
See https://debezium.io/documentation/reference/2.4/connectors/postgresql.html#postgresql-create-events for the full event structure.
*/
final JsonNode lsnSequenceNode = event.eventValueAsJson().get("source").get("sequence");
List<String> lsnSequence = objectMapper.readValue(lsnSequenceNode.asText(), listType);
// The sequence field is a pair of [lsn_commit, lsn_processed]. We want to make sure lsn_commit(event) is compared against
// lsn_commit(state_offset). For the event, either of the lsn values can be null.
String eventLsnCommit = lsnSequence.get(0);
if (eventLsnCommit == null) {
return false;
}
return Long.parseLong(eventLsnCommit) > Long.parseLong(stateOffsetLsnCommit);
} catch (Exception e) {
LOGGER.info("Encountered an error while attempting to parse event's LSN sequence {}", e.getCause());
return false;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public void commitLSNToPostgresDatabase(final JsonNode jdbcConfig,

final LogSequenceNumber logSequenceNumber = LogSequenceNumber.valueOf(savedOffset.getAsLong());

LOGGER.info("Committing upto LSN: {}", savedOffset.getAsLong());
try (final BaseConnection pgConnection = (BaseConnection) PostgresReplicationConnection.createConnection(jdbcConfig)) {
ChainedLogicalStreamBuilder streamBuilder = pgConnection
.getReplicationAPI()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -871,9 +871,9 @@ protected void verifyCheckpointStatesByRecords() throws Exception {
.toListAndClose(secondBatchIterator);
assertEquals(recordsToCreate, extractRecordMessages(dataFromSecondBatch).size());
final List<AirbyteStateMessage> stateMessagesCDC = extractStateMessages(dataFromSecondBatch);
if (!isOnLegacyPostgres()) {
assertTrue(stateMessagesCDC.size() > 1, "Generated only the final state.");
}
// We expect only one cdc state message, as all the records are inserted in a single transaction. Since
// lsn_commit only increases with a new transaction, we expect only one state message.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we not expecting a checkpoint state message (after the transaction) and a final state message in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The one state message we expect is after the dbz engine is shut down. In this case, there is just one transaction so only one state message is expected. (that's the change we're trying to test by comparing against lsn_commit and not lsn_proc)

assertTrue(stateMessagesCDC.size() == 1, "Generated only the final state.");
assertEquals(stateMessagesCDC.size(), stateMessagesCDC.stream().distinct().count(), "There are duplicated states.");
}

Expand Down Expand Up @@ -912,9 +912,9 @@ protected void verifyCheckpointStatesBySeconds() throws Exception {

assertEquals(recordsToCreate, extractRecordMessages(dataFromSecondBatch).size());
final List<AirbyteStateMessage> stateMessagesCDC = extractStateMessages(dataFromSecondBatch);
if (!isOnLegacyPostgres()) {
assertTrue(stateMessagesCDC.size() > 1, "Generated only the final state.");
}
// We expect only one cdc state message, as all the records are inserted in a single transaction. Since
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for the additional comment. Does that mean that somehow, your change is also fixing legacy postgres?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite. For legacy postgres, only one state message was expected (not sure why).

So this change just streamlines the test to check for just one state message instead of multiple (which I'm not sure was a good test to begin with)

Copy link
Contributor

@stephane-airbyte stephane-airbyte Mar 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we NEVER are on legacy postgres... Let's remove that method altogether, no?

// lsn_commit only increases with a new transaction, we expect only one state message.
assertTrue(stateMessagesCDC.size() == 1, "Generated only the final state.");
assertEquals(stateMessagesCDC.size(), stateMessagesCDC.stream().distinct().count(), "There are duplicated states.");
}

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.3.17 | 2024-03-12 | [35939](https://github.com/airbytehq/airbyte/pull/35939) | Use lsn_commit value instead of lsn_proc for CDC checkpointing logic. |
| 3.3.16 | 2024-03-11 | [35904](https://github.com/airbytehq/airbyte/pull/35904) | Adopt Java CDK 0.23.1- debezium retries. |
| 3.3.15 | 2024-02-29 | [34724](https://github.com/airbytehq/airbyte/pull/34724) | Add record count in state message. |
| 3.3.14 | 2024-03-06 | [35842](https://github.com/airbytehq/airbyte/pull/35842) | Add logging to understand cases with a large number of records with the same LSN. |
Expand Down
Loading