Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add comments about intermediate state emission #16262

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,30 @@ public class StateDecoratingIterator extends AbstractIterator<AirbyteMessage> im
private final AirbyteStreamNameNamespacePair pair;
private final String cursorField;
private final JsonSchemaPrimitive cursorType;
private final int stateEmissionFrequency;

private final String initialCursor;
private String maxCursor;
private boolean hasEmittedFinalState;

// The intermediateStateMessage is set to the latest state message.
// For every stateEmissionFrequency messages, emitIntermediateState is set to true and
// the latest intermediateStateMessage will be emitted.
/**
* These parameters are for intermediate state message emission. We can emit an intermediate state
* when the following two conditions are met.
* <p/>
* 1. The records are sorted by the cursor field. This is true when {@code stateEmissionFrequency} >
* 0. This logic is guaranteed in {@code AbstractJdbcSource#queryTableIncremental}, in which an
* "ORDER BY" clause is appended to the SQL query if {@code stateEmissionFrequency} > 0.
* <p/>
* 2. There is a cursor value that is ready for emission. A cursor value is "ready" if there is no
* more record with the same value. We cannot emit a cursor at will, because there may be multiple
* records with the same cursor value. If we emit a cursor ignoring this condition, should the sync
* fail right after the emission, the next sync may skip some records with the same cursor value due
* to "WHERE cursor_field > cursor" in {@code AbstractJdbcSource#queryTableIncremental}.
* <p/>
* The {@code intermediateStateMessage} is set to the latest state message that is ready for
* emission. For every {@code stateEmissionFrequency} messages, {@code emitIntermediateState} is set
* to true and the latest "ready" state will be emitted in the next {@code computeNext} call.
*/
private final int stateEmissionFrequency;
private int totalRecordCount = 0;
private boolean emitIntermediateState = false;
private AirbyteMessage intermediateStateMessage = null;
Expand All @@ -47,9 +62,11 @@ public class StateDecoratingIterator extends AbstractIterator<AirbyteMessage> im
* @param cursorField Path to the comparator field used to track the records read so far
* @param initialCursor name of the initial cursor column
* @param cursorType ENUM type of primitive values that can be used as a cursor for checkpointing
* @param stateEmissionFrequency If larger than 0, intermediate states will be emitted for every
* stateEmissionFrequency records. Only emit intermediate states if the records are sorted by
* the cursor field.
* @param stateEmissionFrequency If larger than 0, the records are sorted by the cursor field, and
* intermediate states will be emitted for every {@code stateEmissionFrequency} records. The
* order of the records is guaranteed in {@code AbstractJdbcSource#queryTableIncremental}, in
* which an "ORDER BY" clause is appended to the SQL query if {@code stateEmissionFrequency}
* > 0.
*/
public StateDecoratingIterator(final Iterator<AirbyteMessage> messageIterator,
final StateManager stateManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All changes in this file are format changes from spotless.

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -75,6 +74,7 @@ private static AirbyteMessage createStateMessage(final String recordValue) {

private Iterator<AirbyteMessage> createExceptionIterator() {
return new Iterator<AirbyteMessage>() {

final Iterator<AirbyteMessage> internalMessageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2,
RECORD_MESSAGE_2, RECORD_MESSAGE_3);

Expand All @@ -88,7 +88,8 @@ public AirbyteMessage next() {
if (internalMessageIterator.hasNext()) {
return internalMessageIterator.next();
} else {
// this line throws a RunTimeException wrapped around a SQLException to mimic the flow of when a SQLException is thrown and wrapped in
// this line throws a RunTimeException wrapped around a SQLException to mimic the flow of when a
// SQLException is thrown and wrapped in
// StreamingJdbcDatabase#tryAdvance
throw new RuntimeException(new SQLException("Connection marked broken because of SQLSTATE(080006)", "08006"));
}
Expand Down Expand Up @@ -186,10 +187,12 @@ void testIteratorCatchesExceptionWhenEmissionFrequencyNonZero() {
1);
assertEquals(RECORD_MESSAGE_1, iterator.next());
assertEquals(RECORD_MESSAGE_2, iterator.next());
// continues to emit RECORD_MESSAGE_2 since cursorField has not changed thus not satisfying the condition of "ready"
// continues to emit RECORD_MESSAGE_2 since cursorField has not changed thus not satisfying the
// condition of "ready"
assertEquals(RECORD_MESSAGE_2, iterator.next());
assertEquals(RECORD_MESSAGE_3, iterator.next());
// emits the first state message since the iterator has changed cursorFields (2 -> 3) and met the frequency minimum of 1 record
// emits the first state message since the iterator has changed cursorFields (2 -> 3) and met the
// frequency minimum of 1 record
assertEquals(STATE_MESSAGE_2, iterator.next());
// no further records to read since Exception was caught above and marked iterator as endOfData()
assertFalse(iterator.hasNext());
Expand All @@ -210,8 +213,10 @@ void testIteratorCatchesExceptionWhenEmissionFrequencyZero() {
assertEquals(RECORD_MESSAGE_2, iterator.next());
assertEquals(RECORD_MESSAGE_2, iterator.next());
assertEquals(RECORD_MESSAGE_3, iterator.next());
// since stateEmission is not set to emit frequently, this will catch the error but not emit state message since it wasn't in a ready state
// of having a frequency > 0 but will prevent an exception from causing the iterator to fail by marking iterator as endOfData()
// since stateEmission is not set to emit frequently, this will catch the error but not emit state
// message since it wasn't in a ready state
// of having a frequency > 0 but will prevent an exception from causing the iterator to fail by
// marking iterator as endOfData()
assertFalse(iterator.hasNext());
}

Expand Down