-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream status on postgres #38716
Stream status on postgres #38716
Conversation
β¦ck to full refresh
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just remember to publish the new CDK and bump cdk version + add to changelog
if (initialSyncCtidIterators.isEmpty()) { | ||
return Collections.singletonList(incrementalIteratorSupplier.get()); | ||
return Stream.of(Collections.singletonList(incrementalIteratorSupplier.get()), allStreamsCompleteStatusEmitters).flatMap(Collection::stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you will miss here emitting STARTED to incremental streams that are already in CDC sync.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this! Fixed and added a test
@@ -1,7 +1,6 @@ | |||
# See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference) | |||
# for more information about how to configure these tests | |||
connector_image: airbyte/source-postgres:dev | |||
test_strictness_level: high |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just curious: why do we remove this line? what tests would fail with this level of strictness?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It runs basic_read
in this acceptance test but we need to remove it since this connector won't pass it as of today. with this line setting strictness level high, it requires us to have basic_read
test enabled. So we'll have to remove this line too.
@@ -524,7 +528,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final | |||
final PostgresXminHandler xminHandler = new PostgresXminHandler(database, sourceOperations, getQuoteString(), xminStatus, xminStateManager); | |||
|
|||
final List<AutoCloseableIterator<AirbyteMessage>> initialSyncCtidIterators = new ArrayList<>(ctidHandler.getInitialSyncCtidIterator( | |||
new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt)); | |||
new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt, true, true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be better to declare those two booleans values as (global) constant variables and pass them to the function for better readability:
static final boolean USE_START_STATUS = true;
static final boolean USE_COMPLETE_STATUS = true;
ctidHandler.getInitialSyncCtidIterator(..., USE_START_STATUS, USE_COMPLETE_STATUS);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added inline comment instead - since USE_START_STATUS
or USE_COMPLETE_STATUS
can be true or false in different scenarios. For readability purpose it's probably more clear to have readers know what that true or false stands for.
@@ -568,7 +572,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final | |||
final List<AutoCloseableIterator<AirbyteMessage>> initialSyncCtidIterators = new ArrayList<>( | |||
cursorBasedCtidHandler.getInitialSyncCtidIterator(new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), | |||
tableNameToTable, | |||
emittedAt)); | |||
emittedAt, true, true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
@@ -217,7 +222,7 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin | |||
} | |||
|
|||
initialSyncCtidIterators.addAll(ctidHandler.getInitialSyncCtidIterator( | |||
new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt)); | |||
new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt, true, false)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
ctidHandler.getInitialSyncCtidIterator(
new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt, USE_START_STATUS, !USE_COMPLETE_STATUS)
Co-authored-by: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com>
What
Source postgres will now send start and complete stream status
How
Review guide
User Impact
Can this PR be safely reverted and rolled back?