Skip to content

Commit cb68670

Browse files
[Source-Postgres] : Implement WASS algorithm (#41649)
Co-authored-by: Evan Tahler <evan@airbyte.io>
1 parent 548c16c commit cb68670

File tree

11 files changed

+235
-76
lines changed

11 files changed

+235
-76
lines changed

airbyte-integrations/connectors/source-postgres/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ java {
1212
}
1313

1414
airbyteJavaConnector {
15-
cdkVersionRequired = '0.40.10'
15+
cdkVersionRequired = '0.41.6'
1616
features = ['db-sources', 'datastore-postgres']
1717
useLocalCdk = false
1818
}

airbyte-integrations/connectors/source-postgres/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
12-
dockerImageTag: 3.4.26
12+
dockerImageTag: 3.5.0
1313
dockerRepository: airbyte/source-postgres
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
1515
githubIssueLabel: source-postgres

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,8 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
556556
* decorateWithStartedStatus=
557557
*/ true, /*
558558
* decorateWithCompletedStatus=
559-
*/ true));
559+
*/ true,
560+
Optional.empty()));
560561
final List<AutoCloseableIterator<AirbyteMessage>> xminIterators = new ArrayList<>(xminHandler.getIncrementalIterators(
561562
new ConfiguredAirbyteCatalog().withStreams(xminStreams.streamsForXminSync()), tableNameToTable, emittedAt));
562563

@@ -600,7 +601,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
600601
final List<AutoCloseableIterator<AirbyteMessage>> initialSyncCtidIterators = new ArrayList<>(
601602
cursorBasedCtidHandler.getInitialSyncCtidIterator(new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid),
602603
tableNameToTable,
603-
emittedAt, /* decorateWithStartedStatus= */ true, /* decorateWithCompletedStatus= */ true));
604+
emittedAt, /* decorateWithStartedStatus= */ true, /* decorateWithCompletedStatus= */ true, Optional.empty()));
604605
final List<AutoCloseableIterator<AirbyteMessage>> cursorBasedIterators = new ArrayList<>(super.getIncrementalIterators(database,
605606
new ConfiguredAirbyteCatalog().withStreams(
606607
cursorBasedStreamsCategorised.remainingStreams()

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java

+118-35
Large diffs are not rendered by default.

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/InitialSyncCtidIterator.java

+39-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.integrations.source.postgres.ctid;
66

7+
import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcSnapshotForceShutdownMessage;
78
import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting;
89
import static io.airbyte.integrations.source.postgres.ctid.InitialSyncCtidIteratorConstants.EIGHT_KB;
910
import static io.airbyte.integrations.source.postgres.ctid.InitialSyncCtidIteratorConstants.GIGABYTE;
@@ -14,7 +15,9 @@
1415
import com.google.common.base.Preconditions;
1516
import com.google.common.collect.AbstractIterator;
1617
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
18+
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
1719
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils;
20+
import io.airbyte.commons.exceptions.TransientErrorException;
1821
import io.airbyte.commons.stream.AirbyteStreamUtils;
1922
import io.airbyte.commons.util.AutoCloseableIterator;
2023
import io.airbyte.commons.util.AutoCloseableIterators;
@@ -25,6 +28,8 @@
2528
import java.sql.Connection;
2629
import java.sql.PreparedStatement;
2730
import java.sql.SQLException;
31+
import java.time.Duration;
32+
import java.time.Instant;
2833
import java.util.ArrayList;
2934
import java.util.LinkedList;
3035
import java.util.List;
@@ -67,6 +72,10 @@ public class InitialSyncCtidIterator extends AbstractIterator<RowDataWithCtid> i
6772
private boolean subQueriesInitialized = false;
6873
private final boolean tidRangeScanCapableDBServer;
6974

75+
private final Instant startInstant;
76+
private Optional<Duration> cdcInitialLoadTimeout;
77+
private boolean isCdcSync;
78+
7079
public InitialSyncCtidIterator(final CtidStateManager ctidStateManager,
7180
final JdbcDatabase database,
7281
final CtidPostgresSourceOperations sourceOperations,
@@ -79,7 +88,9 @@ public InitialSyncCtidIterator(final CtidStateManager ctidStateManager,
7988
final int maxTuple,
8089
final FileNodeHandler fileNodeHandler,
8190
final boolean tidRangeScanCapableDBServer,
82-
final boolean useTestPageSize) {
91+
final boolean useTestPageSize,
92+
final Instant startInstant,
93+
final Optional<Duration> cdcInitialLoadTimeout) {
8394
this.airbyteStream = AirbyteStreamUtils.convertFromNameAndNamespace(tableName, schemaName);
8495
this.blockSize = blockSize;
8596
this.maxTuple = maxTuple;
@@ -95,11 +106,23 @@ public InitialSyncCtidIterator(final CtidStateManager ctidStateManager,
95106
this.tableSize = tableSize;
96107
this.tidRangeScanCapableDBServer = tidRangeScanCapableDBServer;
97108
this.useTestPageSize = useTestPageSize;
109+
this.startInstant = startInstant;
110+
this.cdcInitialLoadTimeout = cdcInitialLoadTimeout;
111+
this.isCdcSync = isCdcSync(ctidStateManager);
98112
}
99113

100114
@CheckForNull
101115
@Override
102116
protected RowDataWithCtid computeNext() {
117+
if (isCdcSync && cdcInitialLoadTimeout.isPresent()
118+
&& Duration.between(startInstant, Instant.now()).compareTo(cdcInitialLoadTimeout.get()) > 0) {
119+
final String cdcInitialLoadTimeoutMessage = String.format(
120+
"Initial load for table %s has taken longer than %s, Canceling sync so that CDC replication can catch-up on subsequent attempt, and then initial snapshotting will resume",
121+
getAirbyteStream().get(), cdcInitialLoadTimeout.get());
122+
LOGGER.info(cdcInitialLoadTimeoutMessage);
123+
AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcSnapshotForceShutdownMessage());
124+
throw new TransientErrorException(cdcInitialLoadTimeoutMessage);
125+
}
103126
try {
104127
if (!subQueriesInitialized) {
105128
initSubQueries();
@@ -323,11 +346,26 @@ public PreparedStatement createCtidLegacyQueryStatement(final Connection connect
323346
}
324347
}
325348

349+
@Override
350+
public Optional<AirbyteStreamNameNamespacePair> getAirbyteStream() {
351+
return Optional.of(airbyteStream);
352+
}
353+
326354
@Override
327355
public void close() throws Exception {
328356
if (currentIterator != null) {
329357
currentIterator.close();
330358
}
331359
}
332360

361+
private boolean isCdcSync(CtidStateManager initialLoadStateManager) {
362+
if (initialLoadStateManager instanceof CtidGlobalStateManager) {
363+
LOGGER.info("Running a cdc sync");
364+
return true;
365+
} else {
366+
LOGGER.info("Not running a cdc sync");
367+
return false;
368+
}
369+
}
370+
333371
}

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/PostgresCtidHandler.java

+13-7
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,12 @@ public PostgresCtidHandler(final JsonNode config,
8080
this.tidRangeScanCapableDBServer = CtidUtils.isTidRangeScanCapableDBServer(database);
8181
}
8282

83+
@NotNull
8384
@Override
8485
public AutoCloseableIterator<AirbyteMessage> getIteratorForStream(@NotNull ConfiguredAirbyteStream airbyteStream,
8586
@NotNull TableInfo<CommonField<PostgresType>> table,
86-
@NotNull Instant emittedAt) {
87+
@NotNull Instant emittedAt,
88+
@NotNull Optional<Duration> cdcInitialLoadTimeout) {
8789
final AirbyteStream stream = airbyteStream.getStream();
8890
final String streamName = stream.getName();
8991
final String namespace = stream.getNamespace();
@@ -100,7 +102,7 @@ public AutoCloseableIterator<AirbyteMessage> getIteratorForStream(@NotNull Confi
100102
table.getName(),
101103
tableBlockSizes.get(pair).tableSize(),
102104
tableBlockSizes.get(pair).blockSize(),
103-
tablesMaxTuple.orElseGet(() -> Map.of(pair, -1)).get(pair));
105+
tablesMaxTuple.orElseGet(() -> Map.of(pair, -1)).get(pair), emittedAt, cdcInitialLoadTimeout);
104106
final AutoCloseableIterator<AirbyteMessageWithCtid> recordIterator =
105107
getRecordIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli());
106108
final AutoCloseableIterator<AirbyteMessage> recordAndMessageIterator = augmentWithState(recordIterator, airbyteStream);
@@ -110,9 +112,10 @@ public AutoCloseableIterator<AirbyteMessage> getIteratorForStream(@NotNull Confi
110112
public List<AutoCloseableIterator<AirbyteMessage>> getInitialSyncCtidIterator(
111113
final ConfiguredAirbyteCatalog catalog,
112114
final Map<String, TableInfo<CommonField<PostgresType>>> tableNameToTable,
113-
final Instant emmitedAt,
115+
final Instant emittedAt,
114116
final boolean decorateWithStartedStatus,
115-
final boolean decorateWithCompletedStatus) {
117+
final boolean decorateWithCompletedStatus,
118+
final Optional<Duration> cdcInitialLoadTimeout) {
116119
final List<AutoCloseableIterator<AirbyteMessage>> iteratorList = new ArrayList<>();
117120
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
118121
final AirbyteStream stream = airbyteStream.getStream();
@@ -134,7 +137,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getInitialSyncCtidIterator(
134137
// Grab the selected fields to sync
135138
final TableInfo<CommonField<PostgresType>> table = tableNameToTable
136139
.get(fullyQualifiedTableName);
137-
final var iterator = getIteratorForStream(airbyteStream, table, emmitedAt);
140+
final var iterator = getIteratorForStream(airbyteStream, table, emittedAt, cdcInitialLoadTimeout);
138141
iteratorList.add(iterator);
139142

140143
if (decorateWithCompletedStatus) {
@@ -152,12 +155,15 @@ private AutoCloseableIterator<RowDataWithCtid> queryTableCtid(
152155
final String tableName,
153156
final long tableSize,
154157
final long blockSize,
155-
final int maxTuple) {
158+
final int maxTuple,
159+
final Instant emittedAt,
160+
@NotNull final Optional<Duration> cdcInitialLoadTimeout) {
156161

157162
LOGGER.info("Queueing query for table: {}", tableName);
158163
return new InitialSyncCtidIterator(ctidStateManager, database, sourceOperations, quoteString, columnNames, schemaName, tableName, tableSize,
159164
blockSize, maxTuple, fileNodeHandler, tidRangeScanCapableDBServer,
160-
config.has(USE_TEST_CHUNK_SIZE) && config.get(USE_TEST_CHUNK_SIZE).asBoolean());
165+
config.has(USE_TEST_CHUNK_SIZE) && config.get(USE_TEST_CHUNK_SIZE).asBoolean(), emittedAt,
166+
cdcInitialLoadTimeout);
161167
}
162168

163169
// Transforms the given iterator to create an {@link AirbyteRecordMessage}

airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json

+10
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,16 @@
304304
"enum": ["Fail sync", "Re-sync data"],
305305
"default": "Fail sync",
306306
"order": 9
307+
},
308+
"initial_load_timeout_hours": {
309+
"type": "integer",
310+
"title": "Initial Load Timeout in Hours (Advanced)",
311+
"description": "The amount of time an initial load is allowed to continue for before catching up on CDC logs.",
312+
"default": 8,
313+
"min": 4,
314+
"max": 24,
315+
"order": 10,
316+
"always_show": true
307317
}
308318
}
309319
},

airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_cloud_deployment_spec.json

+10
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,16 @@
305305
"enum": ["Fail sync", "Re-sync data"],
306306
"default": "Fail sync",
307307
"order": 9
308+
},
309+
"initial_load_timeout_hours": {
310+
"type": "integer",
311+
"title": "Initial Load Timeout in Hours (Advanced)",
312+
"description": "The amount of time an initial load is allowed to continue for before catching up on CDC logs.",
313+
"default": 8,
314+
"min": 4,
315+
"max": 24,
316+
"order": 10,
317+
"always_show": true
308318
}
309319
}
310320
},

airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_spec.json

+10
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,16 @@
304304
"enum": ["Fail sync", "Re-sync data"],
305305
"default": "Fail sync",
306306
"order": 9
307+
},
308+
"initial_load_timeout_hours": {
309+
"type": "integer",
310+
"title": "Initial Load Timeout in Hours (Advanced)",
311+
"description": "The amount of time an initial load is allowed to continue for before catching up on CDC logs.",
312+
"default": 8,
313+
"min": 4,
314+
"max": 24,
315+
"order": 10,
316+
"always_show": true
307317
}
308318
}
309319
},

airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java

+29-29
Original file line numberDiff line numberDiff line change
@@ -269,13 +269,26 @@ protected void validateStreamStateInResumableFullRefresh(final JsonNode streamSt
269269
@Override
270270
protected void assertStateMessagesForNewTableSnapshotTest(final List<? extends AirbyteStateMessage> stateMessages,
271271
final AirbyteStateMessage stateMessageEmittedAfterFirstSyncCompletion) {
272+
// First message emitted in the WASS case is a CDC state message. This should have a different
273+
// global state (LSN) as compared to the previous
274+
// finishing state. The streams in snapshot phase should be the one that is completed at that point.
272275
assertEquals(7, stateMessages.size(), stateMessages.toString());
273-
for (int i = 0; i <= 4; i++) {
276+
final AirbyteStateMessage cdcStateMessage = stateMessages.get(0);
277+
assertNotEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(), cdcStateMessage.getGlobal().getSharedState());
278+
Set<StreamDescriptor> streamsInSnapshotState = cdcStateMessage.getGlobal().getStreamStates()
279+
.stream()
280+
.map(AirbyteStreamState::getStreamDescriptor)
281+
.collect(Collectors.toSet());
282+
assertEquals(1, streamsInSnapshotState.size());
283+
assertTrue(streamsInSnapshotState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(modelsSchema())));
284+
285+
for (int i = 1; i <= 5; i++) {
274286
final AirbyteStateMessage stateMessage = stateMessages.get(i);
275-
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessage.getType());
276-
assertEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(),
277-
stateMessage.getGlobal().getSharedState());
278-
final Set<StreamDescriptor> streamsInSnapshotState = stateMessage.getGlobal().getStreamStates()
287+
// Shared state should not be the same as the first (CDC) state message as it should not change in
288+
// initial sync.
289+
assertEquals(cdcStateMessage.getGlobal().getSharedState(), stateMessage.getGlobal().getSharedState());
290+
streamsInSnapshotState.clear();
291+
streamsInSnapshotState = stateMessage.getGlobal().getStreamStates()
279292
.stream()
280293
.map(AirbyteStreamState::getStreamDescriptor)
281294
.collect(Collectors.toSet());
@@ -296,37 +309,24 @@ protected void assertStateMessagesForNewTableSnapshotTest(final List<? extends A
296309
});
297310
}
298311

299-
final AirbyteStateMessage secondLastSateMessage = stateMessages.get(5);
300-
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, secondLastSateMessage.getType());
301-
assertEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(),
302-
secondLastSateMessage.getGlobal().getSharedState());
303-
final Set<StreamDescriptor> streamsInSnapshotState = secondLastSateMessage.getGlobal().getStreamStates()
312+
// The last message emitted should indicate that initial PK load has finished for both streams.
313+
final AirbyteStateMessage stateMessageEmittedAfterSecondSyncCompletion = stateMessages.get(6);
314+
assertEquals(AirbyteStateType.GLOBAL, stateMessageEmittedAfterSecondSyncCompletion.getType());
315+
assertEquals(cdcStateMessage.getGlobal().getSharedState(),
316+
stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getSharedState());
317+
streamsInSnapshotState.clear();
318+
streamsInSnapshotState = stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getStreamStates()
304319
.stream()
305320
.map(AirbyteStreamState::getStreamDescriptor)
306321
.collect(Collectors.toSet());
307322
assertEquals(2, streamsInSnapshotState.size());
308323
assertTrue(
309324
streamsInSnapshotState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(randomSchema())));
310325
assertTrue(streamsInSnapshotState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(modelsSchema())));
311-
secondLastSateMessage.getGlobal().getStreamStates().forEach(s -> {
326+
stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getStreamStates().forEach(s -> {
312327
final JsonNode streamState = s.getStreamState();
313328
assertFalse(streamState.has(STATE_TYPE_KEY));
314329
});
315-
316-
final AirbyteStateMessage stateMessageEmittedAfterSecondSyncCompletion = stateMessages.get(6);
317-
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterSecondSyncCompletion.getType());
318-
assertNotEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(),
319-
stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getSharedState());
320-
final Set<StreamDescriptor> streamsInSyncCompletionState = stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getStreamStates()
321-
.stream()
322-
.map(AirbyteStreamState::getStreamDescriptor)
323-
.collect(Collectors.toSet());
324-
assertEquals(2, streamsInSnapshotState.size());
325-
assertTrue(
326-
streamsInSyncCompletionState.contains(
327-
new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(randomSchema())));
328-
assertTrue(streamsInSyncCompletionState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(modelsSchema())));
329-
assertNotNull(stateMessageEmittedAfterSecondSyncCompletion.getData());
330330
}
331331

332332
@Test
@@ -449,10 +449,10 @@ public void testTwoStreamSync() throws Exception {
449449
assertNotNull(global.getSharedState());
450450
assertEquals(2, global.getStreamStates().size());
451451

452-
if (i <= 3) {
452+
if (i <= 4) {
453453
final StreamDescriptor finalFirstStreamInState = firstStreamInState;
454454
global.getStreamStates().forEach(c -> {
455-
// First 4 state messages are ctid state for the stream that didn't complete ctid sync the first
455+
// First 5 state messages are ctid state for the stream that didn't complete ctid sync the first
456456
// time
457457
if (c.getStreamDescriptor().equals(finalFirstStreamInState)) {
458458
assertFalse(c.getStreamState().has(STATE_TYPE_KEY));
@@ -462,7 +462,7 @@ public void testTwoStreamSync() throws Exception {
462462
}
463463
});
464464
} else {
465-
// last 2 state messages don't contain ctid info cause ctid sync should be complete
465+
// last state messages doesn't contain ctid info cause ctid sync should be complete
466466
global.getStreamStates().forEach(c -> assertFalse(c.getStreamState().has(STATE_TYPE_KEY)));
467467
}
468468
}

0 commit comments

Comments
 (0)