Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream status on postgres #38716

Merged
merged 59 commits into from
May 29, 2024
Merged

Stream status on postgres #38716

merged 59 commits into from
May 29, 2024

Conversation

xiaohansong
Copy link
Contributor

@xiaohansong xiaohansong commented May 28, 2024

What

Source postgres will now send start and complete stream status

How

Review guide

User Impact

Can this PR be safely reverted and rolled back?

  • YES πŸ’š
  • NO ❌

@octavia-squidington-iii octavia-squidington-iii added area/documentation Improvements or additions to documentation and removed CDK Connector Development Kit labels May 28, 2024
@octavia-squidington-iii octavia-squidington-iii added the CDK Connector Development Kit label May 28, 2024
@xiaohansong xiaohansong changed the title Xiaohan/postgresstream Stream status on postgres May 28, 2024
@xiaohansong xiaohansong marked this pull request as ready for review May 28, 2024 21:45
@xiaohansong xiaohansong requested a review from a team as a code owner May 28, 2024 21:45
Copy link
Contributor

@akashkulk akashkulk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just remember to publish the new CDK and bump cdk version + add to changelog

if (initialSyncCtidIterators.isEmpty()) {
return Collections.singletonList(incrementalIteratorSupplier.get());
return Stream.of(Collections.singletonList(incrementalIteratorSupplier.get()), allStreamsCompleteStatusEmitters).flatMap(Collection::stream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you will miss here emitting STARTED to incremental streams that are already in CDC sync.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this! Fixed and added a test

@@ -1,7 +1,6 @@
# See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-postgres:dev
test_strictness_level: high
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious: why do we remove this line? what tests would fail with this level of strictness?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It runs basic_read in this acceptance test but we need to remove it since this connector won't pass it as of today. with this line setting strictness level high, it requires us to have basic_read test enabled. So we'll have to remove this line too.

@@ -524,7 +528,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
final PostgresXminHandler xminHandler = new PostgresXminHandler(database, sourceOperations, getQuoteString(), xminStatus, xminStateManager);

final List<AutoCloseableIterator<AirbyteMessage>> initialSyncCtidIterators = new ArrayList<>(ctidHandler.getInitialSyncCtidIterator(
new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt));
new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt, true, true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be better to declare those two booleans values as (global) constant variables and pass them to the function for better readability:

static final boolean USE_START_STATUS = true;
static final boolean USE_COMPLETE_STATUS = true;
ctidHandler.getInitialSyncCtidIterator(..., USE_START_STATUS, USE_COMPLETE_STATUS); 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added inline comment instead - since USE_START_STATUS or USE_COMPLETE_STATUS can be true or false in different scenarios. For readability purpose it's probably more clear to have readers know what that true or false stands for.

@@ -568,7 +572,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
final List<AutoCloseableIterator<AirbyteMessage>> initialSyncCtidIterators = new ArrayList<>(
cursorBasedCtidHandler.getInitialSyncCtidIterator(new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid),
tableNameToTable,
emittedAt));
emittedAt, true, true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

@@ -217,7 +222,7 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin
}

initialSyncCtidIterators.addAll(ctidHandler.getInitialSyncCtidIterator(
new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt));
new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt, true, false));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

ctidHandler.getInitialSyncCtidIterator(
          new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt, USE_START_STATUS, !USE_COMPLETE_STATUS)

@xiaohansong xiaohansong merged commit b104667 into master May 29, 2024
31 checks passed
@xiaohansong xiaohansong deleted the xiaohan/postgresstream branch May 29, 2024 20:05
xiaohansong added a commit that referenced this pull request May 29, 2024
Co-authored-by: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation CDK Connector Development Kit connectors/source/postgres
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants