Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Source-Postgres] : Implement WASS algorithm #41649

Merged
merged 60 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from 56 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
9c36109
Initial Commit
akashkulk May 15, 2024
5ed8d39
Merge branch 'master' into akash/wass-algo
akashkulk Jun 3, 2024
4816c58
Fix config error
akashkulk May 21, 2024
22d4674
Revert integration runner
akashkulk Jun 3, 2024
2eaadf3
commit
akashkulk Jun 4, 2024
45b4b57
fix
akashkulk Jun 4, 2024
4d38d55
duration fix
akashkulk Jun 5, 2024
a7ab494
timing
akashkulk Jun 6, 2024
e45ec75
Include partially completed streams
akashkulk Jun 10, 2024
83b9aab
Merge branch 'master' into akash/wass-algo
akashkulk Jun 11, 2024
777f7bc
Merge branch 'master' into akash/wass-algo
akashkulk Jun 11, 2024
6912e30
Merge branch 'master' into akash/wass-algo
akashkulk Jun 17, 2024
aab6ea2
revert
akashkulk Jun 17, 2024
5b83880
Fixes
akashkulk Jun 17, 2024
9bdd2a0
Merge branch 'master' into akash/wass-algo
akashkulk Jun 17, 2024
7fbbbd6
Filter out streams
akashkulk Jun 17, 2024
5231b60
Merge branch 'master' into akash/wass-algo
akashkulk Jun 29, 2024
b20700f
Add stream status emissions + config
akashkulk Jun 30, 2024
f8de8af
Fix unit tests
akashkulk Jul 2, 2024
78d0377
Merge branch 'master' into akash/wass-algo
akashkulk Jul 2, 2024
134d395
Bump versions
akashkulk Jul 2, 2024
aa045b1
Merge branch 'master' into akash/wass-algo
akashkulk Jul 2, 2024
4059b17
Add cdc config for initial load timeout
akashkulk Jul 2, 2024
e698fa8
FIx unit tests
akashkulk Jul 2, 2024
16b349b
Merge branch 'master' into akash/wass-algo
akashkulk Jul 2, 2024
6a33ce5
Update airbyte-integrations/connectors/source-mysql/src/main/java/io/…
akashkulk Jul 3, 2024
0a40758
Address comments
akashkulk Jul 3, 2024
894aa6c
Merge branch 'master' into akash/wass-algo
akashkulk Jul 3, 2024
f11dc87
remove comment
akashkulk Jul 3, 2024
6364f9e
Fix excessive logging
akashkulk Jul 3, 2024
c1aef67
Fix
akashkulk Jul 3, 2024
ccd7f6b
Merge branch 'master' into akash/wass-algo
akashkulk Jul 8, 2024
1fd46cc
Fixes
akashkulk Jul 9, 2024
16ce774
Merge branch 'master' into akash/wass-algo
akashkulk Jul 9, 2024
9781923
Address comments
akashkulk Jul 10, 2024
9bacf66
Merge branch 'master' into akash/wass-algo
akashkulk Jul 10, 2024
44093a0
Update changelog
akashkulk Jul 11, 2024
67ac347
Merge branch 'master' into akash/wass-algo
akashkulk Jul 11, 2024
37a9c41
Update readme
akashkulk Jul 11, 2024
9ace639
Initial
akashkulk Jul 11, 2024
72cc8cd
Merge branch 'master' into akash/pg-wass
akashkulk Jul 11, 2024
e397a2f
Merge branch 'master' into akash/pg-wass
akashkulk Jul 13, 2024
f5d4c88
TODO
akashkulk Jul 11, 2024
e76dc24
unit tests
akashkulk Jul 14, 2024
3b4e15b
Merge branch 'master' into akash/pg-wass
akashkulk Jul 14, 2024
7c1ec74
Fix stream status issues
akashkulk Jul 15, 2024
ac3dd7a
Merge branch 'master' into akash/pg-wass
akashkulk Jul 15, 2024
ad6427a
Remove postgres tests
akashkulk Jul 15, 2024
2a1f7f8
Merge branch 'master' into akash/pg-wass
akashkulk Jul 15, 2024
92202ae
Remove dups
akashkulk Jul 15, 2024
f875b0a
bump changelog
akashkulk Jul 15, 2024
a04e557
Merge branch 'master' into akash/pg-wass
akashkulk Jul 15, 2024
07b5844
Fix comment
akashkulk Jul 16, 2024
1b25c8c
Fix
akashkulk Jul 16, 2024
d524ce9
Merge branch 'master' into akash/pg-wass
akashkulk Jul 16, 2024
43bbe3e
Fix error
akashkulk Jul 16, 2024
d613527
Fix
akashkulk Jul 16, 2024
2266084
Fix
akashkulk Jul 16, 2024
6199e63
Merge branch 'master' into akash/pg-wass
akashkulk Jul 17, 2024
b230131
Bump cdk
akashkulk Jul 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.41.4
version=0.41.4
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ protected AirbyteRecordData computeNext() {
if (isCdcSync && cdcInitialLoadTimeout.isPresent()
&& Duration.between(startInstant, Instant.now()).compareTo(cdcInitialLoadTimeout.get()) > 0) {
final String cdcInitialLoadTimeoutMessage = String.format(
"Initial load for table %s has taken longer than %s, Canceling sync so that CDC replication can catch-up on subsequent attempt, and then initial snapshotting will resume",
getAirbyteStream().get(), cdcInitialLoadTimeout.get());
"Initial load has taken longer than %s, Canceling sync so that CDC replication can catch-up on subsequent attempt, and then initial snapshotting will resume",
cdcInitialLoadTimeout.get());
LOGGER.info(cdcInitialLoadTimeoutMessage);
AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcSnapshotForceShutdownMessage());
throw new TransientErrorException(cdcInitialLoadTimeoutMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.40.10'
cdkVersionRequired = '0.41.2'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.4.26
dockerImageTag: 3.5.0
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,8 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
* decorateWithStartedStatus=
*/ true, /*
* decorateWithCompletedStatus=
*/ true));
*/ true,
Optional.empty()));
final List<AutoCloseableIterator<AirbyteMessage>> xminIterators = new ArrayList<>(xminHandler.getIncrementalIterators(
new ConfiguredAirbyteCatalog().withStreams(xminStreams.streamsForXminSync()), tableNameToTable, emittedAt));

Expand Down Expand Up @@ -600,7 +601,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
final List<AutoCloseableIterator<AirbyteMessage>> initialSyncCtidIterators = new ArrayList<>(
cursorBasedCtidHandler.getInitialSyncCtidIterator(new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid),
tableNameToTable,
emittedAt, /* decorateWithStartedStatus= */ true, /* decorateWithCompletedStatus= */ true));
emittedAt, /* decorateWithStartedStatus= */ true, /* decorateWithCompletedStatus= */ true, Optional.empty()));
final List<AutoCloseableIterator<AirbyteMessage>> cursorBasedIterators = new ArrayList<>(super.getIncrementalIterators(database,
new ConfiguredAirbyteCatalog().withStreams(
cursorBasedStreamsCategorised.remainingStreams()
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.source.postgres.ctid;

import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcSnapshotForceShutdownMessage;
import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting;
import static io.airbyte.integrations.source.postgres.ctid.InitialSyncCtidIteratorConstants.EIGHT_KB;
import static io.airbyte.integrations.source.postgres.ctid.InitialSyncCtidIteratorConstants.GIGABYTE;
Expand All @@ -14,7 +15,9 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils;
import io.airbyte.commons.exceptions.TransientErrorException;
import io.airbyte.commons.stream.AirbyteStreamUtils;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
Expand All @@ -25,6 +28,8 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -67,6 +72,10 @@ public class InitialSyncCtidIterator extends AbstractIterator<RowDataWithCtid> i
private boolean subQueriesInitialized = false;
private final boolean tidRangeScanCapableDBServer;

private final Instant startInstant;
private Optional<Duration> cdcInitialLoadTimeout;
private boolean isCdcSync;

public InitialSyncCtidIterator(final CtidStateManager ctidStateManager,
final JdbcDatabase database,
final CtidPostgresSourceOperations sourceOperations,
Expand All @@ -79,7 +88,9 @@ public InitialSyncCtidIterator(final CtidStateManager ctidStateManager,
final int maxTuple,
final FileNodeHandler fileNodeHandler,
final boolean tidRangeScanCapableDBServer,
final boolean useTestPageSize) {
final boolean useTestPageSize,
final Instant startInstant,
final Optional<Duration> cdcInitialLoadTimeout) {
this.airbyteStream = AirbyteStreamUtils.convertFromNameAndNamespace(tableName, schemaName);
this.blockSize = blockSize;
this.maxTuple = maxTuple;
Expand All @@ -95,11 +106,23 @@ public InitialSyncCtidIterator(final CtidStateManager ctidStateManager,
this.tableSize = tableSize;
this.tidRangeScanCapableDBServer = tidRangeScanCapableDBServer;
this.useTestPageSize = useTestPageSize;
this.startInstant = startInstant;
this.cdcInitialLoadTimeout = cdcInitialLoadTimeout;
this.isCdcSync = isCdcSync(ctidStateManager);
}

@CheckForNull
@Override
protected RowDataWithCtid computeNext() {
if (isCdcSync && cdcInitialLoadTimeout.isPresent()
&& Duration.between(startInstant, Instant.now()).compareTo(cdcInitialLoadTimeout.get()) > 0) {
final String cdcInitialLoadTimeoutMessage = String.format(
"Initial load for table %s has taken longer than %s, Canceling sync so that CDC replication can catch-up on subsequent attempt, and then initial snapshotting will resume",
getAirbyteStream().get(), cdcInitialLoadTimeout.get());
LOGGER.info(cdcInitialLoadTimeoutMessage);
AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcSnapshotForceShutdownMessage());
throw new TransientErrorException(cdcInitialLoadTimeoutMessage);
}
try {
if (!subQueriesInitialized) {
initSubQueries();
Expand Down Expand Up @@ -330,4 +353,14 @@ public void close() throws Exception {
}
}

private boolean isCdcSync(CtidStateManager initialLoadStateManager) {
if (initialLoadStateManager instanceof CtidGlobalStateManager) {
LOGGER.info("Running a cdc sync");
return true;
} else {
LOGGER.info("Not running a cdc sync");
return false;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,12 @@ public PostgresCtidHandler(final JsonNode config,
this.tidRangeScanCapableDBServer = CtidUtils.isTidRangeScanCapableDBServer(database);
}

@NotNull
@Override
public AutoCloseableIterator<AirbyteMessage> getIteratorForStream(@NotNull ConfiguredAirbyteStream airbyteStream,
@NotNull TableInfo<CommonField<PostgresType>> table,
@NotNull Instant emittedAt) {
@NotNull Instant emittedAt,
@NotNull Optional<Duration> cdcInitialLoadTimeout) {
final AirbyteStream stream = airbyteStream.getStream();
final String streamName = stream.getName();
final String namespace = stream.getNamespace();
Expand All @@ -100,7 +102,7 @@ public AutoCloseableIterator<AirbyteMessage> getIteratorForStream(@NotNull Confi
table.getName(),
tableBlockSizes.get(pair).tableSize(),
tableBlockSizes.get(pair).blockSize(),
tablesMaxTuple.orElseGet(() -> Map.of(pair, -1)).get(pair));
tablesMaxTuple.orElseGet(() -> Map.of(pair, -1)).get(pair), emittedAt, cdcInitialLoadTimeout);
final AutoCloseableIterator<AirbyteMessageWithCtid> recordIterator =
getRecordIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli());
final AutoCloseableIterator<AirbyteMessage> recordAndMessageIterator = augmentWithState(recordIterator, airbyteStream);
Expand All @@ -110,9 +112,10 @@ public AutoCloseableIterator<AirbyteMessage> getIteratorForStream(@NotNull Confi
public List<AutoCloseableIterator<AirbyteMessage>> getInitialSyncCtidIterator(
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<PostgresType>>> tableNameToTable,
final Instant emmitedAt,
final Instant emittedAt,
final boolean decorateWithStartedStatus,
final boolean decorateWithCompletedStatus) {
final boolean decorateWithCompletedStatus,
final Optional<Duration> cdcInitialLoadTimeout) {
final List<AutoCloseableIterator<AirbyteMessage>> iteratorList = new ArrayList<>();
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
final AirbyteStream stream = airbyteStream.getStream();
Expand All @@ -134,7 +137,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getInitialSyncCtidIterator(
// Grab the selected fields to sync
final TableInfo<CommonField<PostgresType>> table = tableNameToTable
.get(fullyQualifiedTableName);
final var iterator = getIteratorForStream(airbyteStream, table, emmitedAt);
final var iterator = getIteratorForStream(airbyteStream, table, emittedAt, cdcInitialLoadTimeout);
iteratorList.add(iterator);

if (decorateWithCompletedStatus) {
Expand All @@ -152,12 +155,15 @@ private AutoCloseableIterator<RowDataWithCtid> queryTableCtid(
final String tableName,
final long tableSize,
final long blockSize,
final int maxTuple) {
final int maxTuple,
final Instant emittedAt,
@NotNull final Optional<Duration> cdcInitialLoadTimeout) {

LOGGER.info("Queueing query for table: {}", tableName);
return new InitialSyncCtidIterator(ctidStateManager, database, sourceOperations, quoteString, columnNames, schemaName, tableName, tableSize,
blockSize, maxTuple, fileNodeHandler, tidRangeScanCapableDBServer,
config.has(USE_TEST_CHUNK_SIZE) && config.get(USE_TEST_CHUNK_SIZE).asBoolean());
config.has(USE_TEST_CHUNK_SIZE) && config.get(USE_TEST_CHUNK_SIZE).asBoolean(), emittedAt,
cdcInitialLoadTimeout);
}

// Transforms the given iterator to create an {@link AirbyteRecordMessage}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,16 @@
"enum": ["Fail sync", "Re-sync data"],
"default": "Fail sync",
"order": 9
},
"initial_load_timeout_hours": {
"type": "integer",
"title": "Initial Load Timeout in Hours (Advanced)",
"description": "The amount of time an initial load is allowed to continue for before catching up on CDC logs.",
"default": 8,
"min": 4,
"max": 24,
"order": 10,
"always_show": true
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,16 @@
"enum": ["Fail sync", "Re-sync data"],
"default": "Fail sync",
"order": 9
},
"initial_load_timeout_hours": {
"type": "integer",
"title": "Initial Load Timeout in Hours (Advanced)",
"description": "The amount of time an initial load is allowed to continue for before catching up on CDC logs.",
"default": 8,
"min": 4,
"max": 24,
"order": 10,
"always_show": true
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,16 @@
"enum": ["Fail sync", "Re-sync data"],
"default": "Fail sync",
"order": 9
},
"initial_load_timeout_hours": {
"type": "integer",
"title": "Initial Load Timeout in Hours (Advanced)",
"description": "The amount of time an initial load is allowed to continue for before catching up on CDC logs.",
"default": 8,
"min": 4,
"max": 24,
"order": 10,
"always_show": true
}
}
},
Expand Down
Loading
Loading