-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream status on postgres #38716
Stream status on postgres #38716
Changes from 57 commits
8420789
ba2cd97
9736561
cc1a4e0
4eace2c
71844fa
85fa32a
51e2a8d
2ed1447
e31e48d
c2a8f98
1cddbdc
d7f42b9
53658da
966259e
b80ff14
41f6b35
fa2ff4c
b9344e5
d6e04ef
2208e17
c36ba15
c69749f
cbd577e
eeeaadc
a24e1a6
7bcc0f6
00441a8
3447f17
6a848a6
6ae4514
99a08be
4b841f2
228b26c
c5e88d6
ce15370
5a3caac
613935b
d074730
77b2a8d
15d3de6
d0311e3
e737759
0e40742
35b0f32
96b0d93
3efc4e1
80d7cf4
361362c
63515e5
11e50d8
70db1ae
f79df2a
524596b
ccf2493
efa5bb1
555e740
f2ef8e6
d51f7e0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,12 +66,15 @@ | |
import io.airbyte.cdk.integrations.source.relationaldb.InitialLoadHandler; | ||
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo; | ||
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager; | ||
import io.airbyte.cdk.integrations.source.relationaldb.streamstatus.StreamStatusTraceEmitterIterator; | ||
import io.airbyte.commons.exceptions.ConfigErrorException; | ||
import io.airbyte.commons.functional.CheckedConsumer; | ||
import io.airbyte.commons.functional.CheckedFunction; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.commons.map.MoreMaps; | ||
import io.airbyte.commons.stream.AirbyteStreamStatusHolder; | ||
import io.airbyte.commons.util.AutoCloseableIterator; | ||
import io.airbyte.commons.util.AutoCloseableIterators; | ||
import io.airbyte.integrations.source.postgres.PostgresQueryUtils.ResultWithFailed; | ||
import io.airbyte.integrations.source.postgres.cdc.PostgresReplicationConnection; | ||
import io.airbyte.integrations.source.postgres.ctid.CtidGlobalStateManager; | ||
|
@@ -96,6 +99,7 @@ | |
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType; | ||
import io.airbyte.protocol.models.v0.AirbyteStream; | ||
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; | ||
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage; | ||
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; | ||
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; | ||
import io.airbyte.protocol.models.v0.ConnectorSpecification; | ||
|
@@ -524,7 +528,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final | |
final PostgresXminHandler xminHandler = new PostgresXminHandler(database, sourceOperations, getQuoteString(), xminStatus, xminStateManager); | ||
|
||
final List<AutoCloseableIterator<AirbyteMessage>> initialSyncCtidIterators = new ArrayList<>(ctidHandler.getInitialSyncCtidIterator( | ||
new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt)); | ||
new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt, true, true)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would be better to declare those two booleans values as (global) constant variables and pass them to the function for better readability:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added inline comment instead - since |
||
final List<AutoCloseableIterator<AirbyteMessage>> xminIterators = new ArrayList<>(xminHandler.getIncrementalIterators( | ||
new ConfiguredAirbyteCatalog().withStreams(xminStreams.streamsForXminSync()), tableNameToTable, emittedAt)); | ||
|
||
|
@@ -568,7 +572,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final | |
final List<AutoCloseableIterator<AirbyteMessage>> initialSyncCtidIterators = new ArrayList<>( | ||
cursorBasedCtidHandler.getInitialSyncCtidIterator(new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), | ||
tableNameToTable, | ||
emittedAt)); | ||
emittedAt, true, true)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto. |
||
final List<AutoCloseableIterator<AirbyteMessage>> cursorBasedIterators = new ArrayList<>(super.getIncrementalIterators(database, | ||
new ConfiguredAirbyteCatalog().withStreams( | ||
cursorBasedStreamsCategorised.remainingStreams() | ||
|
@@ -937,6 +941,18 @@ protected void estimateFullRefreshSyncSize(final JdbcDatabase database, | |
} | ||
} | ||
|
||
@Override | ||
public AutoCloseableIterator<AirbyteMessage> augmentWithStreamStatus(final ConfiguredAirbyteStream airbyteStream, | ||
final AutoCloseableIterator<AirbyteMessage> streamIterator) { | ||
final var pair = | ||
new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace()); | ||
final var starterStatus = | ||
new StreamStatusTraceEmitterIterator(new AirbyteStreamStatusHolder(pair, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED)); | ||
final var completeStatus = | ||
new StreamStatusTraceEmitterIterator(new AirbyteStreamStatusHolder(pair, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE)); | ||
return AutoCloseableIterators.concatWithEagerClose(starterStatus, streamIterator, completeStatus); | ||
} | ||
|
||
private List<JsonNode> getFullTableEstimate(final JdbcDatabase database, | ||
final String fullTableName, | ||
final String schemaName, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,8 +22,10 @@ | |
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo; | ||
import io.airbyte.cdk.integrations.source.relationaldb.models.CdcState; | ||
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager; | ||
import io.airbyte.cdk.integrations.source.relationaldb.streamstatus.StreamStatusTraceEmitterIterator; | ||
import io.airbyte.commons.exceptions.ConfigErrorException; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.commons.stream.AirbyteStreamStatusHolder; | ||
import io.airbyte.commons.util.AutoCloseableIterator; | ||
import io.airbyte.commons.util.AutoCloseableIterators; | ||
import io.airbyte.integrations.source.postgres.PostgresQueryUtils; | ||
|
@@ -36,8 +38,10 @@ | |
import io.airbyte.protocol.models.CommonField; | ||
import io.airbyte.protocol.models.v0.AirbyteMessage; | ||
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; | ||
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage; | ||
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; | ||
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; | ||
import io.airbyte.protocol.models.v0.SyncMode; | ||
import java.sql.SQLException; | ||
import java.time.Duration; | ||
import java.time.Instant; | ||
|
@@ -186,11 +190,12 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin | |
savedOffsetAfterReplicationSlotLSN); | ||
final List<AutoCloseableIterator<AirbyteMessage>> initialSyncCtidIterators = new ArrayList<>(); | ||
final List<AirbyteStreamNameNamespacePair> streamsUnderVacuum = new ArrayList<>(); | ||
List<ConfiguredAirbyteStream> finalListOfStreamsToBeSyncedViaCtid = new ArrayList<>(); | ||
if (!ctidStreams.streamsForCtidSync().isEmpty()) { | ||
streamsUnderVacuum.addAll(streamsUnderVacuum(database, | ||
ctidStreams.streamsForCtidSync(), quoteString).result()); | ||
|
||
List<ConfiguredAirbyteStream> finalListOfStreamsToBeSyncedViaCtid = | ||
finalListOfStreamsToBeSyncedViaCtid = | ||
streamsUnderVacuum.isEmpty() ? ctidStreams.streamsForCtidSync() | ||
: ctidStreams.streamsForCtidSync().stream() | ||
.filter(c -> !streamsUnderVacuum.contains(AirbyteStreamNameNamespacePair.fromConfiguredAirbyteSteam(c))) | ||
|
@@ -217,7 +222,7 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin | |
} | ||
|
||
initialSyncCtidIterators.addAll(ctidHandler.getInitialSyncCtidIterator( | ||
new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt)); | ||
new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt, true, false)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto.
|
||
} else { | ||
LOGGER.info("No streams will be synced via ctid"); | ||
} | ||
|
@@ -238,8 +243,27 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin | |
final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorSupplier = () -> handler.getIncrementalIterators( | ||
propertiesManager, eventConverter, new PostgresCdcSavedInfoFetcher(stateToBeUsed), postgresCdcStateHandler); | ||
|
||
final List<ConfiguredAirbyteStream> finalListOfStreamsToBeSyncedViaCtidInLambda = finalListOfStreamsToBeSyncedViaCtid; | ||
final List<AutoCloseableIterator<AirbyteMessage>> cdcStreamsStartStatusEmitters = catalog.getStreams().stream() | ||
.filter(stream -> !finalListOfStreamsToBeSyncedViaCtidInLambda.contains(stream)) | ||
.map(stream -> (AutoCloseableIterator<AirbyteMessage>) new StreamStatusTraceEmitterIterator( | ||
new AirbyteStreamStatusHolder( | ||
new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace()), | ||
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED))) | ||
.toList(); | ||
|
||
final List<AutoCloseableIterator<AirbyteMessage>> allStreamsCompleteStatusEmitters = catalog.getStreams().stream() | ||
.filter(stream -> stream.getSyncMode() == SyncMode.INCREMENTAL) | ||
.map(stream -> (AutoCloseableIterator<AirbyteMessage>) new StreamStatusTraceEmitterIterator( | ||
new AirbyteStreamStatusHolder( | ||
new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace()), | ||
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE))) | ||
.toList(); | ||
|
||
if (initialSyncCtidIterators.isEmpty()) { | ||
return Collections.singletonList(incrementalIteratorSupplier.get()); | ||
return Stream.of(cdcStreamsStartStatusEmitters, Collections.singletonList(incrementalIteratorSupplier.get()), allStreamsCompleteStatusEmitters) | ||
.flatMap(Collection::stream) | ||
.collect(Collectors.toList()); | ||
} | ||
|
||
if (streamsUnderVacuum.isEmpty()) { | ||
|
@@ -248,12 +272,15 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin | |
// We finish the current CDC once the initial snapshot is complete and the next sync starts | ||
// processing the WAL | ||
return Stream | ||
.of(initialSyncCtidIterators, Collections.singletonList(AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null))) | ||
.of(initialSyncCtidIterators, cdcStreamsStartStatusEmitters, | ||
Collections.singletonList(AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null)), | ||
allStreamsCompleteStatusEmitters) | ||
.flatMap(Collection::stream) | ||
.collect(Collectors.toList()); | ||
} else { | ||
LOGGER.warn("Streams are under vacuuming, not going to process WAL"); | ||
return initialSyncCtidIterators; | ||
return Stream.of(initialSyncCtidIterators, cdcStreamsStartStatusEmitters, allStreamsCompleteStatusEmitters).flatMap(Collection::stream) | ||
.collect(Collectors.toList()); | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just curious: why do we remove this line? what tests would fail with this level of strictness?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It runs
basic_read
in this acceptance test but we need to remove it since this connector won't pass it as of today. with this line setting strictness level high, it requires us to havebasic_read
test enabled. So we'll have to remove this line too.