Skip to content

Commit

Permalink
🎉 Postgres source: retrieve only the tables in the publication under …
Browse files Browse the repository at this point in the history
…cdc mode (#14447)

* Move helper methods to dedicated class

* Add unit tests for existing methods

* Move constant to debezium event util

* Include only publicized tables under cdc mode

* Add comments

* Add unit test

* Format code

* Move isCdc method out of postgres source

* Bump versions

* Update doc

* Add comment in unit test

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
tuliren and octavia-squidington-iii authored Jul 10, 2022
1 parent 97eddb0 commit 078f5fc
Show file tree
Hide file tree
Showing 17 changed files with 417 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.4.30
dockerImageTag: 0.4.31
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6854,7 +6854,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:0.4.30"
- dockerImage: "airbyte/source-postgres:0.4.31"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

public class DebeziumEventUtils {

public static final String CDC_LSN = "_ab_cdc_lsn";
public static final String CDC_UPDATED_AT = "_ab_cdc_updated_at";
public static final String CDC_DELETED_AT = "_ab_cdc_deleted_at";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.30
LABEL io.airbyte.version=0.4.31
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.30
LABEL io.airbyte.version=0.4.31
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.postgres;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.debezium.internals.DebeziumEventUtils;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.SyncMode;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class PostgresCdcCatalogHelper {

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresCdcCatalogHelper.class);

private PostgresCdcCatalogHelper() {}

/*
* It isn't possible to recreate the state of the original database unless we include extra
* information (like an oid) when using logical replication. By limiting to Full Refresh when we
* don't have a primary key we dodge the problem for now. As a work around a CDC and non-CDC source
* could be configured if there's a need to replicate a large non-PK table.
*
* Note: in place mutation.
*/
public static AirbyteStream removeIncrementalWithoutPk(final AirbyteStream stream) {
if (stream.getSourceDefinedPrimaryKey().isEmpty()) {
final List<SyncMode> syncModes = new ArrayList<>(stream.getSupportedSyncModes());
syncModes.remove(SyncMode.INCREMENTAL);
stream.setSupportedSyncModes(syncModes);
}

return stream;
}

/*
* Set all streams that do have incremental to sourceDefined, so that the user cannot set or
* override a cursor field.
*
* Note: in place mutation.
*/
public static AirbyteStream setIncrementalToSourceDefined(final AirbyteStream stream) {
if (stream.getSupportedSyncModes().contains(SyncMode.INCREMENTAL)) {
stream.setSourceDefinedCursor(true);
}

return stream;
}

// Note: in place mutation.
public static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) {
final ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema();
final ObjectNode properties = (ObjectNode) jsonSchema.get("properties");

final JsonNode stringType = Jsons.jsonNode(ImmutableMap.of("type", "string"));
final JsonNode numberType = Jsons.jsonNode(ImmutableMap.of("type", "number"));
properties.set(DebeziumEventUtils.CDC_LSN, numberType);
properties.set(DebeziumEventUtils.CDC_UPDATED_AT, stringType);
properties.set(DebeziumEventUtils.CDC_DELETED_AT, stringType);

return stream;
}

/**
* @return tables included in the publication. When it is not CDC mode, returns an empty set.
*/
public static Set<AirbyteStreamNameNamespacePair> getPublicizedTables(final JdbcDatabase database) throws SQLException {
final JsonNode sourceConfig = database.getSourceConfig();
if (sourceConfig == null || !PostgresUtils.isCdc(sourceConfig)) {
return Collections.emptySet();
}

final String publication = sourceConfig.get("replication_method").get("publication").asText();
final List<JsonNode> tablesInPublication = database.queryJsons(
"SELECT schemaname, tablename FROM pg_publication_tables WHERE pubname = ?", publication);
final Set<AirbyteStreamNameNamespacePair> publicizedTables = tablesInPublication.stream()
.map(table -> new AirbyteStreamNameNamespacePair(table.get("tablename").asText(), table.get("schemaname").asText()))
.collect(Collectors.toSet());
LOGGER.info("For CDC, only tables in publication {} will be included in the sync: {}", publication,
publicizedTables.stream().map(pair -> pair.getNamespace() + "." + pair.getName()).toList());

return publicizedTables;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@

package io.airbyte.integrations.source.postgres;

import static io.airbyte.integrations.source.postgres.PostgresSource.CDC_LSN;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.integrations.debezium.CdcMetadataInjector;
import io.airbyte.integrations.debezium.internals.DebeziumEventUtils;

public class PostgresCdcConnectorMetadataInjector implements CdcMetadataInjector {

@Override
public void addMetaData(final ObjectNode event, final JsonNode source) {
final long lsn = source.get("lsn").asLong();
event.put(CDC_LSN, lsn);
event.put(DebeziumEventUtils.CDC_LSN, lsn);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@
package io.airbyte.integrations.source.postgres;

import static io.airbyte.integrations.debezium.AirbyteDebeziumHandler.shouldUseCDC;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
Expand All @@ -36,8 +33,8 @@
import io.airbyte.integrations.source.relationaldb.state.StateManager;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteGlobalState;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteGlobalState;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
Expand All @@ -46,7 +43,6 @@
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.SyncMode;
import java.sql.Connection;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
Expand All @@ -66,7 +62,6 @@
public class PostgresSource extends AbstractJdbcSource<JDBCType> implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSource.class);
public static final String CDC_LSN = "_ab_cdc_lsn";

static final String DRIVER_CLASS = DatabaseDriver.POSTGRESQL.getDriverClassName();
private List<String> schemas;
Expand Down Expand Up @@ -138,11 +133,11 @@ public Set<String> getExcludedInternalNameSpaces() {
public AirbyteCatalog discover(final JsonNode config) throws Exception {
final AirbyteCatalog catalog = super.discover(config);

if (isCdc(config)) {
if (PostgresUtils.isCdc(config)) {
final List<AirbyteStream> streams = catalog.getStreams().stream()
.map(PostgresSource::removeIncrementalWithoutPk)
.map(PostgresSource::setIncrementalToSourceDefined)
.map(PostgresSource::addCdcMetadataColumns)
.map(PostgresCdcCatalogHelper::removeIncrementalWithoutPk)
.map(PostgresCdcCatalogHelper::setIncrementalToSourceDefined)
.map(PostgresCdcCatalogHelper::addCdcMetadataColumns)
.collect(toList());

catalog.setStreams(streams);
Expand All @@ -153,6 +148,19 @@ public AirbyteCatalog discover(final JsonNode config) throws Exception {

@Override
public List<TableInfo<CommonField<JDBCType>>> discoverInternal(final JdbcDatabase database) throws Exception {
final List<TableInfo<CommonField<JDBCType>>> rawTables = discoverRawTables(database);
final Set<AirbyteStreamNameNamespacePair> publicizedTablesInCdc = PostgresCdcCatalogHelper.getPublicizedTables(database);

if (publicizedTablesInCdc.isEmpty()) {
return rawTables;
}
// under cdc mode, only return tables that are in the publication
return rawTables.stream()
.filter(table -> publicizedTablesInCdc.contains(new AirbyteStreamNameNamespacePair(table.getName(), table.getNameSpace())))
.collect(toList());
}

public List<TableInfo<CommonField<JDBCType>>> discoverRawTables(final JdbcDatabase database) throws Exception {
if (schemas != null && !schemas.isEmpty()) {
// process explicitly selected (from UI) schemas
final List<TableInfo<CommonField<JDBCType>>> internals = new ArrayList<>();
Expand All @@ -177,7 +185,7 @@ public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final J
final List<CheckedConsumer<JdbcDatabase, Exception>> checkOperations = new ArrayList<>(
super.getCheckOperations(config));

if (isCdc(config)) {
if (PostgresUtils.isCdc(config)) {
checkOperations.add(database -> {
final List<JsonNode> matchingSlots = database.queryJsons(connection -> {
final String sql = "SELECT * FROM pg_replication_slots WHERE slot_name = ? AND plugin = ? AND database = ?";
Expand All @@ -186,8 +194,7 @@ public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final J
ps.setString(2, PostgresUtils.getPluginValue(config.get("replication_method")));
ps.setString(3, config.get("database").asText());

LOGGER.info(
"Attempting to find the named replication slot using the query: " + ps.toString());
LOGGER.info("Attempting to find the named replication slot using the query: {}", ps);

return ps;
}, sourceOperations::rowToJson);
Expand Down Expand Up @@ -244,7 +251,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
final StateManager stateManager,
final Instant emittedAt) {
final JsonNode sourceConfig = database.getSourceConfig();
if (isCdc(sourceConfig) && shouldUseCDC(catalog)) {
if (PostgresUtils.isCdc(sourceConfig) && shouldUseCDC(catalog)) {
final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler(sourceConfig,
PostgresCdcTargetPosition.targetPosition(database), false);
final PostgresCdcStateHandler postgresCdcStateHandler = new PostgresCdcStateHandler(stateManager);
Expand Down Expand Up @@ -286,31 +293,6 @@ protected List<ConfiguredAirbyteStream> identifyStreamsToSnapshot(final Configur
.collect(Collectors.toList());
}

@VisibleForTesting
static boolean isCdc(final JsonNode config) {
final boolean isCdc = config.hasNonNull("replication_method")
&& config.get("replication_method").hasNonNull("replication_slot")
&& config.get("replication_method").hasNonNull("publication");
LOGGER.info("using CDC: {}", isCdc);
return isCdc;
}

/*
* It isn't possible to recreate the state of the original database unless we include extra
* information (like an oid) when using logical replication. By limiting to Full Refresh when we
* don't have a primary key we dodge the problem for now. As a work around a CDC and non-CDC source
* could be configured if there's a need to replicate a large non-PK table.
*
* Note: in place mutation.
*/
private static AirbyteStream removeIncrementalWithoutPk(final AirbyteStream stream) {
if (stream.getSourceDefinedPrimaryKey().isEmpty()) {
stream.getSupportedSyncModes().remove(SyncMode.INCREMENTAL);
}

return stream;
}

@Override
public Set<JdbcPrivilegeDto> getPrivilegesTableForCurrentUser(final JdbcDatabase database,
final String schema)
Expand Down Expand Up @@ -364,34 +346,6 @@ protected boolean isNotInternalSchema(final JsonNode jsonNode, final Set<String>
return false;
}

/*
* Set all streams that do have incremental to sourceDefined, so that the user cannot set or
* override a cursor field.
*
* Note: in place mutation.
*/
private static AirbyteStream setIncrementalToSourceDefined(final AirbyteStream stream) {
if (stream.getSupportedSyncModes().contains(SyncMode.INCREMENTAL)) {
stream.setSourceDefinedCursor(true);
}

return stream;
}

// Note: in place mutation.
private static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) {
final ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema();
final ObjectNode properties = (ObjectNode) jsonSchema.get("properties");

final JsonNode stringType = Jsons.jsonNode(ImmutableMap.of("type", "string"));
final JsonNode numberType = Jsons.jsonNode(ImmutableMap.of("type", "number"));
properties.set(CDC_LSN, numberType);
properties.set(CDC_UPDATED_AT, stringType);
properties.set(CDC_DELETED_AT, stringType);

return stream;
}

// TODO This is a temporary override so that the Postgres source can take advantage of per-stream
// state
@Override
Expand All @@ -410,7 +364,7 @@ protected List<AirbyteStateMessage> generateEmptyInitialState(final JsonNode con

@Override
protected AirbyteStateType getSupportedStateType(final JsonNode config) {
return isCdc(config) ? AirbyteStateType.GLOBAL : AirbyteStateType.STREAM;
return PostgresUtils.isCdc(config) ? AirbyteStateType.GLOBAL : AirbyteStateType.STREAM;
}

public static void main(final String[] args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,25 @@
package io.airbyte.integrations.source.postgres;

import com.fasterxml.jackson.databind.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresUtils.class);

private static final String PGOUTPUT_PLUGIN = "pgoutput";

public static String getPluginValue(final JsonNode field) {
return field.has("plugin") ? field.get("plugin").asText() : PGOUTPUT_PLUGIN;
}

public static boolean isCdc(final JsonNode config) {
final boolean isCdc = config.hasNonNull("replication_method")
&& config.get("replication_method").hasNonNull("replication_slot")
&& config.get("replication_method").hasNonNull("publication");
LOGGER.info("using CDC: {}", isCdc);
return isCdc;
}

}
Loading

0 comments on commit 078f5fc

Please sign in to comment.