Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream status on postgres #38716

Merged
merged 59 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
8420789
save work
xiaohansong May 8, 2024
ba2cd97
save
xiaohansong May 10, 2024
9736561
save work
xiaohansong May 10, 2024
cc1a4e0
Merge remote-tracking branch 'origin/master' into xiaohan/streamtest
xiaohansong May 10, 2024
4eace2c
Merge remote-tracking branch 'origin/master' into xiaohan/streamtest
xiaohansong May 10, 2024
71844fa
mongo test
xiaohansong May 10, 2024
85fa32a
Initial commit
rodireich May 14, 2024
51e2a8d
Send statuses for non-resumable full refresh
rodireich May 15, 2024
2ed1447
save work
xiaohansong May 16, 2024
e31e48d
checkin
xiaohansong May 16, 2024
c2a8f98
working cdc
rodireich May 20, 2024
1cddbdc
working cdc with non resumable full refresh
rodireich May 20, 2024
d7f42b9
working cdc with resumable full refresh
rodireich May 20, 2024
53658da
working cursor based incremental
rodireich May 20, 2024
966259e
fix tests
rodireich May 20, 2024
b80ff14
fix a case when a missing cursor causes incremental stream to fall ba…
rodireich May 21, 2024
41f6b35
fix tests
rodireich May 21, 2024
fa2ff4c
fix tests
rodireich May 21, 2024
b9344e5
fix tests
rodireich May 21, 2024
d6e04ef
version bump
rodireich May 21, 2024
2208e17
version bump
rodireich May 21, 2024
c36ba15
Merge branch 'master' into 7641-stream-status-messages-prototype
rodireich May 21, 2024
c69749f
Merge branch 'xiaohan/streamtest' into 7641-stream-status-messages-pr…
rodireich May 21, 2024
cbd577e
fix tests
rodireich May 21, 2024
eeeaadc
fix tests. Disabled one recent change to started failing for some reason
rodireich May 21, 2024
a24e1a6
fix tests
rodireich May 21, 2024
7bcc0f6
Merge branch 'master' into 7641-stream-status-messages-prototype
rodireich May 21, 2024
00441a8
sanity: rename status emitter
rodireich May 21, 2024
3447f17
sanity: move status emitter iterator to cdk
rodireich May 21, 2024
6a848a6
sanity
rodireich May 21, 2024
6ae4514
sanity
rodireich May 21, 2024
99a08be
Merge branch 'master' into 7641-stream-status-messages-prototype
rodireich May 22, 2024
4b841f2
sanity
rodireich May 21, 2024
228b26c
sanity
rodireich May 22, 2024
c5e88d6
fix build
rodireich May 22, 2024
ce15370
formatter
rodireich May 22, 2024
5a3caac
sanity
rodireich May 22, 2024
613935b
Merge branch 'master' into 7641-stream-status-messages-prototype
rodireich May 22, 2024
d074730
cdk version bump
rodireich May 22, 2024
77b2a8d
Merge branch 'master' into 7641-stream-status-messages-prototype
rodireich May 23, 2024
15d3de6
bump cdk version
rodireich May 23, 2024
d0311e3
toll back changes to mongodb tests
rodireich May 23, 2024
e737759
Merge branch 'master' of github.com:airbytehq/airbyte into 7641-strea…
rodireich May 23, 2024
0e40742
Merge branch 'master' into 7641-stream-status-messages-prototype
rodireich May 23, 2024
35b0f32
Merge branch 'master' into 7641-stream-status-messages-prototype
rodireich May 23, 2024
96b0d93
save work
xiaohansong May 23, 2024
3efc4e1
Merge remote-tracking branch 'origin/master' into xiaohan/postgresstream
xiaohansong May 23, 2024
80d7cf4
save work
xiaohansong May 24, 2024
361362c
save
xiaohansong May 28, 2024
63515e5
Merge remote-tracking branch 'origin/master' into xiaohan/postgresstream
xiaohansong May 28, 2024
11e50d8
version and format
xiaohansong May 28, 2024
70db1ae
upgrade cdk dependency
xiaohansong May 28, 2024
f79df2a
relax acceptance test because we do not pass stream status test
xiaohansong May 28, 2024
524596b
Merge branch 'master' into xiaohan/postgresstream
xiaohansong May 28, 2024
ccf2493
fix for xmin
xiaohansong May 28, 2024
efa5bb1
format
xiaohansong May 28, 2024
555e740
fix bug and add a test
xiaohansong May 29, 2024
f2ef8e6
comment
xiaohansong May 29, 2024
d51f7e0
Merge branch 'master' into xiaohan/postgresstream
xiaohansong May 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,11 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
expectedStreamStatus: AirbyteStreamStatusTraceMessage
) {
var actualMessage = allMessages[idx]
Assertions.assertEquals(AirbyteMessage.Type.TRACE, actualMessage.type)
Assertions.assertEquals(
AirbyteMessage.Type.TRACE,
actualMessage.type,
"[Debug] all Message: $allMessages"
)
var traceMessage = actualMessage.trace
Assertions.assertNotNull(traceMessage.streamStatus)
Assertions.assertEquals(expectedStreamStatus, traceMessage.streamStatus)
Expand Down Expand Up @@ -576,6 +580,25 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
val secondBatchIterator = source().read(config()!!, configuredCatalog, state)
val dataFromSecondBatch = AutoCloseableIterators.toListAndClose(secondBatchIterator)

assertStreamStatusTraceMessageIndex(
0,
dataFromSecondBatch,
createAirbteStreanStatusTraceMessage(
modelsSchema(),
MODELS_STREAM_NAME,
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED
)
)
assertStreamStatusTraceMessageIndex(
dataFromSecondBatch.size - 1,
dataFromSecondBatch,
createAirbteStreanStatusTraceMessage(
modelsSchema(),
MODELS_STREAM_NAME,
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE
)
)

val stateAfterSecondBatch = extractStateMessages(dataFromSecondBatch)
assertExpectedStateMessagesFromIncrementalSync(stateAfterSecondBatch)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.NUMBER),
)
val actualMessages = MoreIterators.toList(source().read(config(), catalog, null))
var actualMessages = MoreIterators.toList(source().read(config(), catalog, null))

assertStreamStatusTraceMessageIndex(
0,
Expand Down Expand Up @@ -469,11 +469,19 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
setTraceEmittedAtToNull(actualMessages)
setTraceEmittedAtToNull(expectedMessages)

actualMessages = removeStateMessage(actualMessages)

Assertions.assertEquals(expectedMessages.size, actualMessages.size)
Assertions.assertTrue(expectedMessages.containsAll(actualMessages))
Assertions.assertTrue(actualMessages.containsAll(expectedMessages))
}

private fun removeStateMessage(airbyteMessages: List<AirbyteMessage>): List<AirbyteMessage> {
var mutableListMessages = airbyteMessages.toMutableList()
mutableListMessages.removeIf { message -> message.type == AirbyteMessage.Type.STATE }
return mutableListMessages
}

protected open val airbyteMessagesReadOneColumn: MutableList<AirbyteMessage>
get() {
val expectedMessages =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-postgres:dev
test_strictness_level: high
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious: why do we remove this line? what tests would fail with this level of strictness?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It runs basic_read in this acceptance test but we need to remove it since this connector won't pass it as of today. with this line setting strictness level high, it requires us to have basic_read test enabled. So we'll have to remove this line too.

custom_environment_variables:
USE_STREAM_CAPABLE_STATE: true
acceptance_tests:
Expand Down Expand Up @@ -32,16 +31,6 @@ acceptance_tests:
- config_path: "secrets/config_cdc.json"
backward_compatibility_tests_config:
disable_for_version: "2.1.1"
basic_read:
tests:
- config_path: "secrets/config.json"
expect_records:
path: "integration_tests/expected_records.txt"
validate_state_messages: false
- config_path: "secrets/config_cdc.json"
expect_records:
path: "integration_tests/expected_records.txt"
validate_state_messages: false
full_refresh:
tests:
- config_path: "secrets/config.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.35.4'
cdkVersionRequired = '0.35.12'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
useLocalCdk = true
}

application {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.4.7
dockerImageTag: 3.4.8
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,15 @@
import io.airbyte.cdk.integrations.source.relationaldb.InitialLoadHandler;
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager;
import io.airbyte.cdk.integrations.source.relationaldb.streamstatus.StreamStatusTraceEmitterIterator;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.commons.stream.AirbyteStreamStatusHolder;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.integrations.source.postgres.PostgresQueryUtils.ResultWithFailed;
import io.airbyte.integrations.source.postgres.cdc.PostgresReplicationConnection;
import io.airbyte.integrations.source.postgres.ctid.CtidGlobalStateManager;
Expand All @@ -96,6 +99,7 @@
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.ConnectorSpecification;
Expand Down Expand Up @@ -524,7 +528,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
final PostgresXminHandler xminHandler = new PostgresXminHandler(database, sourceOperations, getQuoteString(), xminStatus, xminStateManager);

final List<AutoCloseableIterator<AirbyteMessage>> initialSyncCtidIterators = new ArrayList<>(ctidHandler.getInitialSyncCtidIterator(
new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt));
new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt, true, true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be better to declare those two booleans values as (global) constant variables and pass them to the function for better readability:

static final boolean USE_START_STATUS = true;
static final boolean USE_COMPLETE_STATUS = true;
ctidHandler.getInitialSyncCtidIterator(..., USE_START_STATUS, USE_COMPLETE_STATUS); 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added inline comment instead - since USE_START_STATUS or USE_COMPLETE_STATUS can be true or false in different scenarios. For readability purpose it's probably more clear to have readers know what that true or false stands for.

final List<AutoCloseableIterator<AirbyteMessage>> xminIterators = new ArrayList<>(xminHandler.getIncrementalIterators(
new ConfiguredAirbyteCatalog().withStreams(xminStreams.streamsForXminSync()), tableNameToTable, emittedAt));

Expand Down Expand Up @@ -568,7 +572,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
final List<AutoCloseableIterator<AirbyteMessage>> initialSyncCtidIterators = new ArrayList<>(
cursorBasedCtidHandler.getInitialSyncCtidIterator(new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid),
tableNameToTable,
emittedAt));
emittedAt, true, true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

final List<AutoCloseableIterator<AirbyteMessage>> cursorBasedIterators = new ArrayList<>(super.getIncrementalIterators(database,
new ConfiguredAirbyteCatalog().withStreams(
cursorBasedStreamsCategorised.remainingStreams()
Expand Down Expand Up @@ -937,6 +941,18 @@ protected void estimateFullRefreshSyncSize(final JdbcDatabase database,
}
}

@Override
public AutoCloseableIterator<AirbyteMessage> augmentWithStreamStatus(final ConfiguredAirbyteStream airbyteStream,
final AutoCloseableIterator<AirbyteMessage> streamIterator) {
final var pair =
new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
final var starterStatus =
new StreamStatusTraceEmitterIterator(new AirbyteStreamStatusHolder(pair, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED));
final var completeStatus =
new StreamStatusTraceEmitterIterator(new AirbyteStreamStatusHolder(pair, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE));
return AutoCloseableIterators.concatWithEagerClose(starterStatus, streamIterator, completeStatus);
}

private List<JsonNode> getFullTableEstimate(final JdbcDatabase database,
final String fullTableName,
final String schemaName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo;
import io.airbyte.cdk.integrations.source.relationaldb.models.CdcState;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager;
import io.airbyte.cdk.integrations.source.relationaldb.streamstatus.StreamStatusTraceEmitterIterator;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.stream.AirbyteStreamStatusHolder;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.integrations.source.postgres.PostgresQueryUtils;
Expand All @@ -36,8 +38,10 @@
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -186,11 +190,12 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin
savedOffsetAfterReplicationSlotLSN);
final List<AutoCloseableIterator<AirbyteMessage>> initialSyncCtidIterators = new ArrayList<>();
final List<AirbyteStreamNameNamespacePair> streamsUnderVacuum = new ArrayList<>();
List<ConfiguredAirbyteStream> finalListOfStreamsToBeSyncedViaCtid = new ArrayList<>();
if (!ctidStreams.streamsForCtidSync().isEmpty()) {
streamsUnderVacuum.addAll(streamsUnderVacuum(database,
ctidStreams.streamsForCtidSync(), quoteString).result());

List<ConfiguredAirbyteStream> finalListOfStreamsToBeSyncedViaCtid =
finalListOfStreamsToBeSyncedViaCtid =
streamsUnderVacuum.isEmpty() ? ctidStreams.streamsForCtidSync()
: ctidStreams.streamsForCtidSync().stream()
.filter(c -> !streamsUnderVacuum.contains(AirbyteStreamNameNamespacePair.fromConfiguredAirbyteSteam(c)))
Expand All @@ -217,7 +222,7 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin
}

initialSyncCtidIterators.addAll(ctidHandler.getInitialSyncCtidIterator(
new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt));
new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt, true, false));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

ctidHandler.getInitialSyncCtidIterator(
          new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt, USE_START_STATUS, !USE_COMPLETE_STATUS)

} else {
LOGGER.info("No streams will be synced via ctid");
}
Expand All @@ -238,8 +243,27 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin
final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorSupplier = () -> handler.getIncrementalIterators(
propertiesManager, eventConverter, new PostgresCdcSavedInfoFetcher(stateToBeUsed), postgresCdcStateHandler);

final List<ConfiguredAirbyteStream> finalListOfStreamsToBeSyncedViaCtidInLambda = finalListOfStreamsToBeSyncedViaCtid;
final List<AutoCloseableIterator<AirbyteMessage>> cdcStreamsStartStatusEmitters = catalog.getStreams().stream()
.filter(stream -> !finalListOfStreamsToBeSyncedViaCtidInLambda.contains(stream))
.map(stream -> (AutoCloseableIterator<AirbyteMessage>) new StreamStatusTraceEmitterIterator(
new AirbyteStreamStatusHolder(
new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace()),
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED)))
.toList();

final List<AutoCloseableIterator<AirbyteMessage>> allStreamsCompleteStatusEmitters = catalog.getStreams().stream()
.filter(stream -> stream.getSyncMode() == SyncMode.INCREMENTAL)
.map(stream -> (AutoCloseableIterator<AirbyteMessage>) new StreamStatusTraceEmitterIterator(
new AirbyteStreamStatusHolder(
new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace()),
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE)))
.toList();

if (initialSyncCtidIterators.isEmpty()) {
return Collections.singletonList(incrementalIteratorSupplier.get());
return Stream.of(cdcStreamsStartStatusEmitters, Collections.singletonList(incrementalIteratorSupplier.get()), allStreamsCompleteStatusEmitters)
.flatMap(Collection::stream)
.collect(Collectors.toList());
}

if (streamsUnderVacuum.isEmpty()) {
Expand All @@ -248,12 +272,15 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin
// We finish the current CDC once the initial snapshot is complete and the next sync starts
// processing the WAL
return Stream
.of(initialSyncCtidIterators, Collections.singletonList(AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null)))
.of(initialSyncCtidIterators, cdcStreamsStartStatusEmitters,
Collections.singletonList(AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null)),
allStreamsCompleteStatusEmitters)
.flatMap(Collection::stream)
.collect(Collectors.toList());
} else {
LOGGER.warn("Streams are under vacuuming, not going to process WAL");
return initialSyncCtidIterators;
return Stream.of(initialSyncCtidIterators, cdcStreamsStartStatusEmitters, allStreamsCompleteStatusEmitters).flatMap(Collection::stream)
.collect(Collectors.toList());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency;
import io.airbyte.cdk.integrations.source.relationaldb.streamstatus.StreamStatusTraceEmitterIterator;
import io.airbyte.commons.stream.AirbyteStreamStatusHolder;
import io.airbyte.commons.stream.AirbyteStreamUtils;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
Expand All @@ -29,6 +31,7 @@
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
Expand Down Expand Up @@ -107,7 +110,9 @@ public AutoCloseableIterator<AirbyteMessage> getIteratorForStream(@NotNull Confi
public List<AutoCloseableIterator<AirbyteMessage>> getInitialSyncCtidIterator(
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<PostgresType>>> tableNameToTable,
final Instant emmitedAt) {
final Instant emmitedAt,
final boolean decorateWithStartedStatus,
final boolean decorateWithCompletedStatus) {
final List<AutoCloseableIterator<AirbyteMessage>> iteratorList = new ArrayList<>();
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
final AirbyteStream stream = airbyteStream.getStream();
Expand All @@ -119,11 +124,23 @@ public List<AutoCloseableIterator<AirbyteMessage>> getInitialSyncCtidIterator(
continue;
}
if (airbyteStream.getSyncMode().equals(SyncMode.INCREMENTAL)) {

final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(streamName, namespace);
if (decorateWithStartedStatus) {
iteratorList.add(
new StreamStatusTraceEmitterIterator(new AirbyteStreamStatusHolder(pair, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED)));
}

// Grab the selected fields to sync
final TableInfo<CommonField<PostgresType>> table = tableNameToTable
.get(fullyQualifiedTableName);
final var iterator = getIteratorForStream(airbyteStream, table, emmitedAt);
iteratorList.add(iterator);

if (decorateWithCompletedStatus) {
iteratorList.add(new StreamStatusTraceEmitterIterator(
new AirbyteStreamStatusHolder(pair, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE)));
}
}
}
return iteratorList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency;
import io.airbyte.cdk.integrations.source.relationaldb.streamstatus.StreamStatusTraceEmitterIterator;
import io.airbyte.commons.stream.AirbyteStreamStatusHolder;
import io.airbyte.commons.stream.AirbyteStreamUtils;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
Expand All @@ -28,6 +30,7 @@
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
Expand Down Expand Up @@ -92,6 +95,9 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
}

if (airbyteStream.getSyncMode().equals(SyncMode.INCREMENTAL)) {
iteratorList.add(
new StreamStatusTraceEmitterIterator(new AirbyteStreamStatusHolder(pair, AirbyteStreamStatus.STARTED)));

// Grab the selected fields to sync
final TableInfo<CommonField<PostgresType>> table = tableNameToTable
.get(fullyQualifiedTableName);
Expand All @@ -107,6 +113,8 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
final AutoCloseableIterator<AirbyteMessage> recordAndMessageIterator = augmentWithState(recordIterator, airbyteStream, pair);

iteratorList.add(augmentWithLogs(recordAndMessageIterator, pair, streamName));
iteratorList.add(
new StreamStatusTraceEmitterIterator(new AirbyteStreamStatusHolder(pair, AirbyteStreamStatus.COMPLETE)));
}
}

Expand Down
Loading
Loading