diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 29082fa14ee7b..f257496e12416 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -156,6 +156,7 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.2.2 | 2023-10-31 | [\#31976](https://github.com/airbytehq/airbyte/pull/31976) | Debezium tweaks to make tests run faster. | | 0.2.0 | 2023-10-30 | [\#31960](https://github.com/airbytehq/airbyte/pull/31960) | Hoist top-level gradle subprojects into CDK. | | 0.1.12 | 2023-10-24 | [\#31674](https://github.com/airbytehq/airbyte/pull/31674) | Fail sync when Debezium does not shut down properly. | | 0.1.11 | 2023-10-18 | [\#31486](https://github.com/airbytehq/airbyte/pull/31486) | Update constants in AdaptiveSourceRunner. | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index caeaf12047c26..f4f461910bae2 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.2.0 +version=0.2.2 diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.java index ca096d4fe9f0c..563360578b760 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.java @@ -23,6 +23,7 @@ import io.debezium.engine.ChangeEvent; import java.time.Duration; import java.time.Instant; +import java.util.Collections; import java.util.Optional; import java.util.OptionalInt; import java.util.Properties; @@ -98,6 +99,10 @@ public AutoCloseableIterator getSnapshotIterators( .fromIterator(MoreIterators.singletonIteratorFromSupplier(cdcStateHandler::saveStateAfterCompletionOfSnapshotOfNewStreams))); } + /** + * In the default case here, we don't know for sure whether the Debezium Engine will produce records + * or not. We therefore pass {@link canShortCircuitDebeziumEngine} = false. + */ public AutoCloseableIterator getIncrementalIterators(final ConfiguredAirbyteCatalog catalog, final CdcSavedInfoFetcher cdcSavedInfoFetcher, final CdcStateHandler cdcStateHandler, @@ -106,24 +111,59 @@ public AutoCloseableIterator getIncrementalIterators(final Confi final DebeziumPropertiesManager.DebeziumConnectorType debeziumConnectorType, final Instant emittedAt, final boolean addDbNameToState) { + return getIncrementalIterators( + catalog, + cdcSavedInfoFetcher, + cdcStateHandler, + cdcMetadataInjector, + connectorProperties, + debeziumConnectorType, + emittedAt, addDbNameToState, + false); + } + + /** + * + * @param canShortCircuitDebeziumEngine This argument may be set to true in cases where we already + * know that the Debezium Engine is not going to be producing any change events. In this + * case, this method skips provisioning a Debezium Engine altogether. + */ + public AutoCloseableIterator getIncrementalIterators(final ConfiguredAirbyteCatalog catalog, + final CdcSavedInfoFetcher cdcSavedInfoFetcher, + final CdcStateHandler cdcStateHandler, + final CdcMetadataInjector cdcMetadataInjector, + final Properties connectorProperties, + final DebeziumPropertiesManager.DebeziumConnectorType debeziumConnectorType, + final Instant emittedAt, + final boolean addDbNameToState, + final boolean canShortCircuitDebeziumEngine) { LOGGER.info("Using CDC: {}", true); - final LinkedBlockingQueue> queue = new LinkedBlockingQueue<>(queueSize.orElse(QUEUE_CAPACITY)); - final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState(cdcSavedInfoFetcher.getSavedOffset(), + final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState( + cdcSavedInfoFetcher.getSavedOffset(), addDbNameToState ? Optional.ofNullable(config.get(JdbcUtils.DATABASE_KEY).asText()) : Optional.empty()); final Optional schemaHistoryManager = - trackSchemaHistory ? schemaHistoryManager(cdcSavedInfoFetcher.getSavedSchemaHistory(), cdcStateHandler.compressSchemaHistoryForState()) + trackSchemaHistory ? schemaHistoryManager( + cdcSavedInfoFetcher.getSavedSchemaHistory(), + cdcStateHandler.compressSchemaHistoryForState()) : Optional.empty(); - final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(connectorProperties, config, catalog, offsetManager, - schemaHistoryManager, debeziumConnectorType); - publisher.start(queue); - // handle state machine around pub/sub logic. - final AutoCloseableIterator eventIterator = new DebeziumRecordIterator<>( - queue, - targetPosition, - publisher::hasClosed, - new DebeziumShutdownProcedure<>(queue, publisher::close, publisher::hasClosed), - firstRecordWaitTime); + final AutoCloseableIterator eventIterator; + if (!canShortCircuitDebeziumEngine) { + final var publisher = new DebeziumRecordPublisher( + connectorProperties, config, catalog, offsetManager, schemaHistoryManager, debeziumConnectorType); + final var queue = new LinkedBlockingQueue>(queueSize.orElse(QUEUE_CAPACITY)); + publisher.start(queue); + // handle state machine around pub/sub logic. + eventIterator = new DebeziumRecordIterator<>( + queue, + targetPosition, + publisher::hasClosed, + new DebeziumShutdownProcedure<>(queue, publisher::close, publisher::hasClosed), + firstRecordWaitTime); + } else { + LOGGER.info("Short-circuiting Debezium Engine: nothing of interest in target replication stream interval."); + eventIterator = AutoCloseableIterators.fromIterator(Collections.emptyIterator()); + } final Duration syncCheckpointDuration = config.get(SYNC_CHECKPOINT_DURATION_PROPERTY) != null ? Duration.ofSeconds(config.get(SYNC_CHECKPOINT_DURATION_PROPERTY).asLong()) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIterator.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIterator.java index 7aaf29ef9466a..6255acefaa805 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIterator.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIterator.java @@ -149,7 +149,7 @@ protected ChangeEventWithMetadata computeNext() { while (!debeziumShutdownProcedure.getRecordsRemainingAfterShutdown().isEmpty()) { final ChangeEvent event; try { - event = debeziumShutdownProcedure.getRecordsRemainingAfterShutdown().poll(10, TimeUnit.SECONDS); + event = debeziumShutdownProcedure.getRecordsRemainingAfterShutdown().poll(100, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumShutdownProcedure.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumShutdownProcedure.java index 1a14670f25ae5..d0661e0a7cdcf 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumShutdownProcedure.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumShutdownProcedure.java @@ -48,9 +48,9 @@ public DebeziumShutdownProcedure(final LinkedBlockingQueue sourceQueue, private Runnable transfer() { return () -> { - while (!sourceQueue.isEmpty() || !MoreBooleans.isTruthy(publisherStatusSupplier.get())) { + while (!sourceQueue.isEmpty() || !hasEngineShutDown()) { try { - T event = sourceQueue.poll(10, TimeUnit.SECONDS); + final T event = sourceQueue.poll(100, TimeUnit.MILLISECONDS); if (event != null) { targetQueue.put(event); } @@ -62,13 +62,17 @@ private Runnable transfer() { }; } + private boolean hasEngineShutDown() { + return MoreBooleans.isTruthy(publisherStatusSupplier.get()); + } + private void initiateTransfer() { executorService.execute(transfer()); } public LinkedBlockingQueue getRecordsRemainingAfterShutdown() { if (!hasTransferThreadShutdown) { - LOGGER.warn("Queue transfer thread has not shutdown, some records might be missing"); + LOGGER.warn("Queue transfer thread has not shut down, some records might be missing."); } return targetQueue; } @@ -83,8 +87,8 @@ public LinkedBlockingQueue getRecordsRemainingAfterShutdown() { * complete we just have to read the remaining records from the {@link targetQueue} */ public void initiateShutdownProcedure() { - if (publisherStatusSupplier.get()) { - LOGGER.info("Engine has already shutdown"); + if (hasEngineShutDown()) { + LOGGER.info("Debezium Engine has already shut down."); return; } Exception exceptionDuringEngineClose = null; diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/postgres/PostgresDebeziumStateUtil.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/postgres/PostgresDebeziumStateUtil.java index 174c03893fa24..938fd11e903ed 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/postgres/PostgresDebeziumStateUtil.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/postgres/PostgresDebeziumStateUtil.java @@ -124,6 +124,48 @@ public void commitLSNToPostgresDatabase(final JsonNode jdbcConfig, } } + public boolean maybeReplicationStreamIntervalHasRecords(final JsonNode jdbcConfig, + final String slotName, + final String publicationName, + final String plugin, + final long startOffset, + final long endOffset) { + try (final BaseConnection pgConnection = (BaseConnection) PostgresReplicationConnection.createConnection(jdbcConfig)) { + ChainedLogicalStreamBuilder streamBuilder = pgConnection + .getReplicationAPI() + .replicationStream() + .logical() + .withSlotName("\"" + slotName + "\"") + .withStartPosition(LogSequenceNumber.valueOf(startOffset)); + streamBuilder = addSlotOption(publicationName, plugin, pgConnection, streamBuilder); + + try (final PGReplicationStream stream = streamBuilder.start()) { + LogSequenceNumber current = stream.getLastReceiveLSN(); + final LogSequenceNumber end = LogSequenceNumber.valueOf(endOffset); + // Attempt to read from the stream. + // This will advance the stream past any bookkeeping entries, until: + // - either the end of the stream is reached, + // - or a meaningful entry is read. + // In the first case, we can update the current position and conclude that the stream contains + // nothing of + // interest to us between the starting position and the current position. + final var msg = stream.readPending(); + if (msg == null) { + current = stream.getLastReceiveLSN(); + } + if (current.compareTo(end) >= 0) { + // If we've reached or gone past the end of the interval which interests us, + // then there's nothing in it that we could possibly care about. + return false; + } + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + // In all other cases, we can't draw any conclusions as to the contents of the stream interval. + return true; + } + private ChainedLogicalStreamBuilder addSlotOption(final String publicationName, final String plugin, final BaseConnection pgConnection, diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/debezium/internals/PostgresDebeziumStateUtilTest.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/debezium/internals/PostgresDebeziumStateUtilTest.java index bd753e2972468..f69ddc78311b6 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/debezium/internals/PostgresDebeziumStateUtilTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/debezium/internals/PostgresDebeziumStateUtilTest.java @@ -180,8 +180,8 @@ public void LsnCommitTest(final String plugin) throws SQLException { database.execute("insert into public.test_table values (1, 'foo');"); database.execute("insert into public.test_table values (2, 'bar');"); - final Lsn lsnAtTheBeginning = Lsn.valueOf( - getReplicationSlot(database, fullReplicationSlot, plugin, dbName).get("confirmed_flush_lsn").asText()); + final var slotStateAtTheBeginning = getReplicationSlot(database, fullReplicationSlot, plugin, dbName); + final Lsn lsnAtTheBeginning = Lsn.valueOf(slotStateAtTheBeginning.get("confirmed_flush_lsn").asText()); final long targetLsn = PostgresUtils.getLsn(database).asLong(); postgresDebeziumStateUtil.commitLSNToPostgresDatabase(Jsons.jsonNode(databaseConfig), @@ -190,11 +190,52 @@ public void LsnCommitTest(final String plugin) throws SQLException { publication, plugin); - final Lsn lsnAfterCommit = Lsn.valueOf( - getReplicationSlot(database, fullReplicationSlot, plugin, dbName).get("confirmed_flush_lsn").asText()); + final var slotStateAfterCommit = getReplicationSlot(database, fullReplicationSlot, plugin, dbName); + final Lsn lsnAfterCommit = Lsn.valueOf(slotStateAfterCommit.get("confirmed_flush_lsn").asText()); Assertions.assertEquals(1, lsnAfterCommit.compareTo(lsnAtTheBeginning)); Assertions.assertEquals(targetLsn, lsnAfterCommit.asLong()); + Assertions.assertNotEquals(slotStateAtTheBeginning, slotStateAfterCommit); + + // Now check that maybeReplicationStreamIntervalHasRecords behaves as expected. + + final long lsnCommitted = lsnAfterCommit.asLong(); + + database.execute("SELECT txid_current();"); + database.execute("CHECKPOINT"); + final long lsnAfterBookkeepingStatements = PostgresUtils.getLsn(database).asLong(); + Assertions.assertNotEquals(lsnCommitted, lsnAfterBookkeepingStatements); + + Assertions.assertFalse(postgresDebeziumStateUtil.maybeReplicationStreamIntervalHasRecords( + Jsons.jsonNode(databaseConfig), + fullReplicationSlot, + publication, + plugin, + lsnCommitted, + lsnAfterBookkeepingStatements)); + + database.execute("INSERT INTO public.test_table VALUES (3, 'baz');"); + final long lsnAfterMeaningfulStatement = PostgresUtils.getLsn(database).asLong(); + Assertions.assertNotEquals(lsnCommitted, lsnAfterMeaningfulStatement); + + Assertions.assertTrue(postgresDebeziumStateUtil.maybeReplicationStreamIntervalHasRecords( + Jsons.jsonNode(databaseConfig), + fullReplicationSlot, + publication, + plugin, + lsnCommitted, + lsnAfterMeaningfulStatement)); + Assertions.assertTrue(postgresDebeziumStateUtil.maybeReplicationStreamIntervalHasRecords( + Jsons.jsonNode(databaseConfig), + fullReplicationSlot, + publication, + plugin, + lsnAfterBookkeepingStatements, + lsnAfterMeaningfulStatement)); + + final var slotStateAtTheEnd = getReplicationSlot(database, fullReplicationSlot, plugin, dbName); + Assertions.assertEquals(slotStateAfterCommit, slotStateAtTheEnd); + container.stop(); } } diff --git a/airbyte-integrations/connectors/source-postgres/build.gradle b/airbyte-integrations/connectors/source-postgres/build.gradle index 4995a8e1a9471..fdebeab16db84 100644 --- a/airbyte-integrations/connectors/source-postgres/build.gradle +++ b/airbyte-integrations/connectors/source-postgres/build.gradle @@ -7,7 +7,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.2.0' + cdkVersionRequired = '0.2.2' features = ['db-sources'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-postgres/metadata.yaml b/airbyte-integrations/connectors/source-postgres/metadata.yaml index b0f688901fcd6..7244657eef0af 100644 --- a/airbyte-integrations/connectors/source-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/source-postgres/metadata.yaml @@ -1,4 +1,7 @@ data: + ab_internal: + ql: 400 + sl: 300 allowedHosts: hosts: - ${host} @@ -6,12 +9,13 @@ data: connectorSubtype: database connectorType: source definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 - dockerImageTag: 3.2.15 - maxSecondsBetweenMessages: 7200 + dockerImageTag: 3.2.16 dockerRepository: airbyte/source-postgres + documentationUrl: https://docs.airbyte.com/integrations/sources/postgres githubIssueLabel: source-postgres icon: postgresql.svg license: ELv2 + maxSecondsBetweenMessages: 7200 name: Postgres registries: cloud: @@ -19,12 +23,8 @@ data: oss: enabled: true releaseStage: generally_available - documentationUrl: https://docs.airbyte.com/integrations/sources/postgres + supportLevel: certified tags: - language:java - language:python - ab_internal: - sl: 300 - ql: 400 - supportLevel: certified metadataSpecVersion: "1.0" diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java index 34224f66a84ab..a9a5553a6f72d 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java @@ -161,18 +161,32 @@ public static List> cdcCtidIteratorsCombin LOGGER.info("No streams will be synced via ctid"); } + final var targetPosition = PostgresCdcTargetPosition.targetPosition(database); final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler<>(sourceConfig, - PostgresCdcTargetPosition.targetPosition(database), false, firstRecordWaitTime, queueSize); + targetPosition, false, firstRecordWaitTime, queueSize); final PostgresCdcStateHandler postgresCdcStateHandler = new PostgresCdcStateHandler(stateManager); - final Supplier> incrementalIteratorSupplier = () -> handler.getIncrementalIterators(catalog, + final boolean canShortCircuitDebeziumEngine = savedOffset.isPresent() && + // Until the need presents itself in production, short-circuiting should only be done in tests. + sourceConfig.has("is_test") && sourceConfig.get("is_test").asBoolean() && + !postgresDebeziumStateUtil.maybeReplicationStreamIntervalHasRecords( + database.getDatabaseConfig(), + sourceConfig.get("replication_method").get("replication_slot").asText(), + sourceConfig.get("replication_method").get("publication").asText(), + PostgresUtils.getPluginValue(sourceConfig.get("replication_method")), + savedOffset.getAsLong(), + targetPosition.targetLsn.asLong()); + + final Supplier> incrementalIteratorSupplier = () -> handler.getIncrementalIterators( + catalog, new PostgresCdcSavedInfoFetcher(stateToBeUsed), postgresCdcStateHandler, new PostgresCdcConnectorMetadataInjector(), PostgresCdcProperties.getDebeziumDefaultProperties(database), DebeziumPropertiesManager.DebeziumConnectorType.RELATIONALDB, emittedAt, - false); + false, + canShortCircuitDebeziumEngine); if (initialSyncCtidIterators.isEmpty()) { return Collections.singletonList(incrementalIteratorSupplier.get()); diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcProperties.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcProperties.java index 5d564d2b6375f..609bf9def3b96 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcProperties.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcProperties.java @@ -25,7 +25,11 @@ public class PostgresCdcProperties { - private static final int HEARTBEAT_FREQUENCY_SEC = 10; + private static final Duration HEARTBEAT_INTERVAL = Duration.ofSeconds(10L); + + // Test execution latency is lower when heartbeats are more frequent. + private static final Duration HEARTBEAT_INTERVAL_IN_TESTS = Duration.ofSeconds(1L); + private static final Logger LOGGER = LoggerFactory.getLogger(PostgresCdcProperties.class); public static Properties getDebeziumDefaultProperties(final JdbcDatabase database) { @@ -58,7 +62,13 @@ private static Properties commonProperties(final JdbcDatabase database) { props.setProperty("converters", "datetime"); props.setProperty("datetime.type", PostgresConverter.class.getName()); props.setProperty("include.unknown.datatypes", "true"); - props.setProperty("heartbeat.interval.ms", Long.toString(Duration.ofSeconds(HEARTBEAT_FREQUENCY_SEC).toMillis())); + + final Duration heartbeatInterval = + (database.getSourceConfig().has("is_test") && database.getSourceConfig().get("is_test").asBoolean()) + ? HEARTBEAT_INTERVAL_IN_TESTS + : HEARTBEAT_INTERVAL; + props.setProperty("heartbeat.interval.ms", Long.toString(heartbeatInterval.toMillis())); + if (PostgresUtils.shouldFlushAfterSync(sourceConfig)) { props.setProperty("flush.lsn.source", "false"); } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java index f7209a81ea31e..0cce4b29c6046 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java @@ -567,7 +567,8 @@ protected Source getSource() { @Override protected JsonNode getConfig() { - return config; + // Clone it to guard against accidental mutations. + return Jsons.clone(config); } @Override diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index d30561a5a0362..5893cb9ab3545 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -291,6 +291,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | Version | Date | Pull Request | Subject | |---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.2.16 | 2023-10-31 | [31976](https://github.com/airbytehq/airbyte/pull/31976) | Speed up tests involving Debezium | | 3.2.15 | 2023-10-30 | [31960](https://github.com/airbytehq/airbyte/pull/31960) | Adopt java CDK version 0.2.0. | | 3.2.14 | 2023-10-24 | [31792](https://github.com/airbytehq/airbyte/pull/31792) | Fix error message link on issue with standby | | 3.2.14 | 2023-10-24 | [31792](https://github.com/airbytehq/airbyte/pull/31792) | fail sync when debezeum fails to shutdown cleanly | @@ -479,4 +480,4 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | 0.1.7 | 2021-01-08 | [1307](https://github.com/airbytehq/airbyte/pull/1307) | Migrate Postgres and MySql to use new JdbcSource | | 0.1.6 | 2020-12-09 | [1172](https://github.com/airbytehq/airbyte/pull/1172) | Support incremental sync | | 0.1.5 | 2020-11-30 | [1038](https://github.com/airbytehq/airbyte/pull/1038) | Change JDBC sources to discover more than standard schemas | -| 0.1.4 | 2020-11-30 | [1046](https://github.com/airbytehq/airbyte/pull/1046) | Add connectors using an index YAML file | +| 0.1.4 | 2020-11-30 | [1046](https://github.com/airbytehq/airbyte/pull/1046) | Add connectors using an index YAML file | \ No newline at end of file