Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Source-postgres] Set default cursor value for cdc mode #27442

Merged
merged 8 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,7 @@ protected AirbyteCatalog expectedCatalogForDiscover() {
// stream with PK
streams.get(0).setSourceDefinedCursor(true);
addCdcMetadataColumns(streams.get(0));
addCdcDefaultCursorField(streams.get(0));

final AirbyteStream streamWithoutPK = CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME + "_2",
Expand All @@ -779,6 +780,7 @@ protected AirbyteCatalog expectedCatalogForDiscover() {
Field.of(COL_MODEL, JsonSchemaType.STRING));
streamWithoutPK.setSourceDefinedPrimaryKey(Collections.emptyList());
streamWithoutPK.setSupportedSyncModes(List.of(SyncMode.FULL_REFRESH));
addCdcDefaultCursorField(streamWithoutPK);
addCdcMetadataColumns(streamWithoutPK);

final AirbyteStream randomStream = CatalogHelpers.createAirbyteStream(
Expand All @@ -790,6 +792,8 @@ protected AirbyteCatalog expectedCatalogForDiscover() {
.withSourceDefinedCursor(true)
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID + "_random")));

addCdcDefaultCursorField(randomStream);
addCdcMetadataColumns(randomStream);

streams.add(streamWithoutPK);
Expand All @@ -815,6 +819,8 @@ protected AirbyteCatalog expectedCatalogForDiscover() {

protected abstract void addCdcMetadataColumns(final AirbyteStream stream);

protected abstract void addCdcDefaultCursorField(final AirbyteStream stream);

protected abstract Source getSource();

protected abstract JsonNode getConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,11 @@ protected void addCdcMetadataColumns(final AirbyteStream stream) {

}

@Override
protected void addCdcDefaultCursorField(final AirbyteStream stream) {
// Leaving empty until cdc default cursor is implemented for MSSQL
}

@Override
protected Source getSource() {
return new MssqlSource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ protected void addCdcMetadataColumns(final AirbyteStream stream) {
properties.set(CDC_DELETED_AT, stringType);
}

@Override
protected void addCdcDefaultCursorField(final AirbyteStream stream) {
// Leaving empty until cdc default cursor is implemented for MySQL
}

@Override
protected Source getSource() {
return source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.1.1
LABEL io.airbyte.version=3.0.0
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ data:
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
maxSecondsBetweenMessages: 7200
dockerImageTag: 2.1.1
dockerImageTag: 3.0.0
dockerRepository: airbyte/source-postgres-strict-encrypt
githubIssueLabel: source-postgres
icon: postgresql.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.1.1
LABEL io.airbyte.version=3.0.0
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ acceptance_tests:
tests:
- config_path: "secrets/config.json"
- config_path: "secrets/config_cdc.json"
backward_compatibility_tests_config:
disable_for_version: "2.1.1"
basic_read:
tests:
- config_path: "secrets/config.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 2.1.1
dockerImageTag: 3.0.0
maxSecondsBetweenMessages: 7200
dockerRepository: airbyte/source-postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@

package io.airbyte.integrations.source.postgres;

import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_LSN;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -123,7 +126,7 @@ public static Set<AirbyteStreamNameNamespacePair> getPublicizedTables(final Jdbc
}

/**
* This method is used for xmin synsc in order to overwrite sync modes for cursor fields. For xmin, we want streams to only have incremental mode
* This method is used for xmin syncs in order to overwrite sync modes for cursor fields. For xmin, we want streams to only have incremental mode
* enabled.
*
* @param stream - airbyte stream
Expand All @@ -133,4 +136,12 @@ public static AirbyteStream overrideSyncModesForXmin(final AirbyteStream stream)
return stream.withSupportedSyncModes(Lists.newArrayList(SyncMode.INCREMENTAL));
}

/*
* To prepare for Destination v2, cdc streams must have a default cursor field
* this defaults to lsn as a cursor as it is monotonically increasing and unique
*/
public static AirbyteStream setDefaultCursorFieldForCdc(final AirbyteStream stream) {
stream.setDefaultCursorField(ImmutableList.of(CDC_LSN));
return stream;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ public AirbyteCatalog discover(final JsonNode config) throws Exception {
.map(PostgresCatalogHelper::overrideSyncModes)
.map(PostgresCatalogHelper::removeIncrementalWithoutPk)
.map(PostgresCatalogHelper::setIncrementalToSourceDefined)
.map(PostgresCatalogHelper::setDefaultCursorFieldForCdc)
Comment on lines 277 to +278
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏼 only for cdc, we are setting the default cursor.

.map(PostgresCatalogHelper::addCdcMetadataColumns)
// If we're in CDC mode and a stream is not in the publication, the user should only be able to sync
// this in FULL_REFRESH mode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ protected void addCdcMetadataColumns(final AirbyteStream stream) {

}

@Override
protected void addCdcDefaultCursorField(final AirbyteStream stream) {
stream.setDefaultCursorField(ImmutableList.of(CDC_LSN));
}

@Override
protected Source getSource() {
return source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.source.postgres;

import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_LSN;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -16,6 +17,7 @@
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;

class PostgresCatalogHelperTest {

Expand Down Expand Up @@ -64,6 +66,16 @@ public void testSetIncrementalToSourceDefined() {
.setIncrementalToSourceDefined(noIncremental)
.getSourceDefinedCursor());
}
@Test
public void testSetDefaultCursorFieldForCdc() {
final AirbyteStream cdcIncrementalStream = new AirbyteStream()
.withSourceDefinedCursor(true)
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH));
PostgresCatalogHelper.setDefaultCursorFieldForCdc(cdcIncrementalStream);

assertTrue(cdcIncrementalStream.getSourceDefinedCursor());
assertEquals(cdcIncrementalStream.getDefaultCursorField(), ImmutableList.of(CDC_LSN));
}

@Test
public void testAddCdcMetadataColumns() {
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ Some larger tables may encounter an error related to the temporary file size lim

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --- |
| 3.0.0 | 2023-07-12 | [27442](https://github.com/airbytehq/airbyte/pull/27442) | Set _ab_cdc_lsn as the source defined cursor for CDC mode to prepare for Destination v2 normalization |
| 2.1.1 | 2023-07-06 | [26723](https://github.com/airbytehq/airbyte/pull/26723) | Add new xmin replication method. |
| 2.1.0 | 2023-06-26 | [27737](https://github.com/airbytehq/airbyte/pull/27737) | License Update: Elv2 |
| 2.0.34 | 2023-06-20 | [27212](https://github.com/airbytehq/airbyte/pull/27212) | Fix silent exception swallowing in StreamingJdbcDatabase |
Expand Down