-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
improve error message for tables with invalid columns as cursor #15317
Changes from all commits
d60b2ec
87258a3
5321946
667ce0a
b0a90fa
f498ce1
20c0d8e
e51470c
52431d0
682e530
9732314
39a51c3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ | |
import io.airbyte.integrations.BaseConnector; | ||
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; | ||
import io.airbyte.integrations.base.Source; | ||
import io.airbyte.integrations.source.relationaldb.InvalidCursorException.InvalidCursorInfo; | ||
import io.airbyte.integrations.source.relationaldb.models.DbState; | ||
import io.airbyte.integrations.source.relationaldb.state.StateManager; | ||
import io.airbyte.integrations.source.relationaldb.state.StateManagerFactory; | ||
|
@@ -50,6 +51,7 @@ | |
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
|
@@ -136,6 +138,8 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config, | |
.collect(Collectors.toMap(t -> String.format("%s.%s", t.getNameSpace(), t.getName()), Function | ||
.identity())); | ||
|
||
validateCursorFieldForIncrementalTables(fullyQualifiedTableNameToInfo, catalog); | ||
|
||
final List<AutoCloseableIterator<AirbyteMessage>> incrementalIterators = | ||
getIncrementalIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, emittedAt); | ||
final List<AutoCloseableIterator<AirbyteMessage>> fullRefreshIterators = | ||
|
@@ -153,6 +157,42 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config, | |
}); | ||
} | ||
|
||
private void validateCursorFieldForIncrementalTables(final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable, final ConfiguredAirbyteCatalog catalog) { | ||
final List<InvalidCursorInfo> tablesWithInvalidCursor = new ArrayList<>(); | ||
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) { | ||
final AirbyteStream stream = airbyteStream.getStream(); | ||
final String fullyQualifiedTableName = getFullyQualifiedTableName(stream.getNamespace(), | ||
stream.getName()); | ||
final boolean hasSourceDefinedCursor = | ||
!Objects.isNull(airbyteStream.getStream().getSourceDefinedCursor()) && airbyteStream.getStream().getSourceDefinedCursor(); | ||
if (!tableNameToTable.containsKey(fullyQualifiedTableName) || airbyteStream.getSyncMode() != SyncMode.INCREMENTAL || hasSourceDefinedCursor) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for my understanding: is there any situation where |
||
continue; | ||
} | ||
|
||
final TableInfo<CommonField<DataType>> table = tableNameToTable | ||
.get(fullyQualifiedTableName); | ||
final Optional<String> cursorField = IncrementalUtils.getCursorFieldOptional(airbyteStream); | ||
if (cursorField.isEmpty()) { | ||
continue; | ||
} | ||
final DataType cursorType = table.getFields().stream() | ||
.filter(info -> info.getName().equals(cursorField.get())) | ||
.map(CommonField::getType) | ||
.findFirst() | ||
.orElseThrow(); | ||
|
||
if (!isValidCursorType(cursorType)) { | ||
tablesWithInvalidCursor.add(new InvalidCursorInfo(fullyQualifiedTableName, cursorField.get(), cursorType.toString())); | ||
} | ||
} | ||
|
||
if (!tablesWithInvalidCursor.isEmpty()) { | ||
throw new InvalidCursorException(tablesWithInvalidCursor); | ||
} | ||
} | ||
|
||
protected abstract boolean isValidCursorType(final DataType cursorType); | ||
|
||
protected List<TableInfo<CommonField<DataType>>> discoverWithoutSystemTables(final Database database) throws Exception { | ||
final Set<String> systemNameSpaces = getExcludedInternalNameSpaces(); | ||
final List<TableInfo<CommonField<DataType>>> discoveredTables = discoverInternal(database); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
package io.airbyte.integrations.source.relationaldb; | ||
|
||
import java.util.List; | ||
import java.util.stream.Collectors; | ||
|
||
public class InvalidCursorException extends RuntimeException { | ||
|
||
public InvalidCursorException(final List<InvalidCursorInfo> tablesWithInvalidCursor) { | ||
super("The following tables have invalid columns selected as cursor, please select a column with a well-defined ordering as a cursor. " + tablesWithInvalidCursor.stream().map(InvalidCursorInfo::toString) | ||
.collect(Collectors.joining(","))); | ||
} | ||
|
||
public record InvalidCursorInfo(String tableName, String cursorColumnName, String cursorSqlType) { | ||
|
||
@Override | ||
public String toString() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tiny nitpick: I'd prefer to define a new method |
||
return "{" + | ||
"tableName='" + tableName + '\'' + | ||
", cursorColumnName='" + cursorColumnName + '\'' + | ||
", cursorSqlType=" + cursorSqlType + | ||
'}'; | ||
} | ||
} | ||
|
||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless I'm mistaken, shouldn't this method have
throws InvalidCursorException
at the end to indicate that it could throw an Exception?Another note is can we have a short javadoc comment that says something along the lines of
EDIT: After chatting with Ed on this, since
InvalidCursorException
extendsRuntimeException
you don't need to define this in the method, preference would still be to have this defined either as a javadoc comment or within the method to just know in case the future someone wants to catch this Exception