Skip to content

Commit

Permalink
move state manager inside incremental block
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens committed Dec 3, 2020
1 parent f9eceb8 commit 775e2ce
Showing 1 changed file with 1 addition and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ private List<Table<?>> discoverInternal(final Database database) throws Exceptio

@Override
public Stream<AirbyteMessage> read(JsonNode config, ConfiguredAirbyteCatalog catalog, JsonNode state) throws Exception {
final JdbcStateManager stateManager = new JdbcStateManager(Jsons.object(state, JdbcState.class));
final Instant now = Instant.now();

final Database database = createDatabase(config);
Expand Down Expand Up @@ -185,6 +184,7 @@ public Stream<AirbyteMessage> read(JsonNode config, ConfiguredAirbyteCatalog cat

final Stream<AirbyteMessage> stream;
if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
final JdbcStateManager stateManager = new JdbcStateManager(Jsons.object(state, JdbcState.class));
final String cursorField = IncrementalUtils.getCursorField(airbyteStream);
final JsonSchemaPrimitive cursorType = IncrementalUtils.getCursorType(airbyteStream, cursorField);
final Optional<String> initialCursorOptional = stateManager.getOriginalCursor(streamName);
Expand Down

0 comments on commit 775e2ce

Please sign in to comment.