-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DiscoverSchema endpoints calculates diff and breaking change #18571
Conversation
Currently writing tests for SchedulerHandler |
@@ -202,14 +208,9 @@ public ConfigurationApi(final ConfigRepository configRepository, | |||
jobPersistence, | |||
workerEnvironment, | |||
logConfigs, | |||
eventRunner); | |||
eventRunner, connectionsHandler); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, this addition should be on its own line.
return retrieveDiscoveredSchema(persistedCatalogId); | ||
final SourceDiscoverSchemaRead discoveredSchema = retrieveDiscoveredSchema(persistedCatalogId); | ||
|
||
if (discoverSchemaRequestBody.getConnectionId() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When can this be null. Should we log an error if so?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this endpoint is called when connections are created in order to discover schemas, and in this case we don't need to pass in the connection id since there is not an existing sync catalog to compare anything against
if (streamTransform.getTransformType() != TransformTypeEnum.UPDATE_STREAM) { | ||
return; | ||
} | ||
streamTransform.getUpdateStream().stream().forEach(fieldTransform -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we should consider takeWhile
here instead of foreach
in order to be able to breakout of the loop as soon as possible. I wonder if it can have an impact on big catalogs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure exactly how this would work. I think takeWhile continues to operate on each item until it reaches something that does not match the predicate, and then it will not operate on that field, so I'm not sure we could actually set isBreaking to true in that case..but maybe you're thinking about a different way of doing this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just updated it to use anyMatch
instead which is probably cleaner and I think does break out early
@@ -409,4 +434,19 @@ private JobInfoRead readJobFromResult(final ManualOperationResult manualOperatio | |||
return jobConverter.getJobInfoRead(job); | |||
} | |||
|
|||
private boolean containsBreakingChange(final CatalogDiff diff) { | |||
AtomicBoolean isBreaking = new AtomicBoolean(false); | |||
diff.getTransforms().stream().forEach(streamTransform -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, this looks like more like a reduce operation rather than a map. In Java it would be written like
.collect(false,
(acc, value) -> acc = acc || value.getBreaking(),
(left, right) -> left = left || right
)
I find it more explicit about what this is doing but I don't know what is the opinion of the rest of the team.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this more confusing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if that's better...
diff.getTransforms().stream()
.filter(streamTransform -> streamTransform.getTransformType() == TransformTypeEnum.UPDATE_STREAM)
.flatMap(streamTransform -> streamTransform.getUpdateStream().stream().map(fieldTransform::getBreaking))
.anyMatch(Boolean::booleanValue)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks better to me. What I would like to avoid is to have state within the function use in the stream API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@benmoriceau why do we want to avoid that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I generally think of stream notation as a more functional approach, in that paradigm having a mutable states/side effects would generally be something to avoid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What jimmy is saying. Ideally when using this we need to be able to take the anonymous function in the foreach call and move it to its own method which is not the case here. I would personally prefer using a for loop like for (A a: as)
if we are not stateless in the anonymous functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Synced up on Zoom about that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, some nits and questions.
airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java
Show resolved
Hide resolved
@@ -368,12 +367,14 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti | |||
return buildWebBackendConnectionRead(connection, currentSourceCatalogId).catalogDiff(diff); | |||
} | |||
|
|||
private Optional<SourceDiscoverSchemaRead> getRefreshedSchema(final UUID sourceId) | |||
private Optional<SourceDiscoverSchemaRead> getRefreshedSchema(final UUID sourceId, final UUID connectionId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: since this function is actually going to fetch the schema everytime, feels like refreshSchema
is more explicit. A bit out-of-scope, I'll probably rename this at some point.
airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java
Show resolved
Hide resolved
"transformType" : "add_stream", | ||
"updateStream" : [ { | ||
"updateFieldSchema" : { }, | ||
"fieldName" : [ "fieldName", "fieldName" ], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discovering the json here, why do we repeat "fieldName"
twice here?
@gosusnp I made a somewhat significant change after you approved - the SourceDiscoverSchemaRead object now has a breakingChange field, so we can set that on the connectionRead object in the WebBackendConnectionsHandler without refetching the connection object after updating it in the DiscoverSchema endpoint |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall. got some comments about code organization that might not be needed for this review.
@@ -409,4 +434,19 @@ private JobInfoRead readJobFromResult(final ManualOperationResult manualOperatio | |||
return jobConverter.getJobInfoRead(job); | |||
} | |||
|
|||
private boolean containsBreakingChange(final CatalogDiff diff) { | |||
AtomicBoolean isBreaking = new AtomicBoolean(false); | |||
diff.getTransforms().stream().forEach(streamTransform -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks better to me. What I would like to avoid is to have state within the function use in the stream API.
throws JsonValidationException, ConfigNotFoundException, IOException { | ||
final Optional<io.airbyte.api.model.generated.AirbyteCatalog> catalogUsedToMakeConfiguredCatalog = connectionsHandler | ||
.getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId()); | ||
io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit should this be final (Same for the diff)? If you had to reset your intelliJ workspace recently , it is likely that the save actions plugin configuration has been lost.
@@ -116,7 +124,8 @@ public SchedulerHandler(final ConfigRepository configRepository, | |||
final JsonSchemaValidator jsonSchemaValidator, | |||
final JobPersistence jobPersistence, | |||
final EventRunner eventRunner, | |||
final JobConverter jobConverter) { | |||
final JobConverter jobConverter, | |||
final ConnectionsHandler connectionsHandler) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for the scope of this review but to keep in mind for later: I think discoverSchemaForSourceFromSourceId
should be move to the SourceHandler. We should get rid of the scheduler handler at some points (we don't have any scheduler anymore).
.getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId()); | ||
io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog = | ||
connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId()).getSyncCatalog(); | ||
CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for this review: The getDiff could part of the CatalogHelper/. It doesn't interact with the persistenceLayer. The goal would be to avoid having an handler to depends on another one.
.getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId()); | ||
io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog = | ||
connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId()).getSyncCatalog(); | ||
CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the use case for having a catalog fallback? This feels like something that could be worth documenting here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just copy pasted this to move it from WebBackendConnectionsHandler, so I'm not 100% sure. I think it might be in case we can't find a source catalog for a particular connection, then we fall back on the configured airbyte catalog that exists for that connection, but I'm not sure about the case where that would actually happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, can we move this code to some shared helper somewhere? Spreading unclear logic feels like a trap for our future selves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@benmoriceau made a comment above to move getDiff to the CatalogsHelper:
"Not for this review: The getDiff could part of the CatalogHelper/. It doesn't interact with the persistenceLayer. The goal would be to avoid having an handler to depends on another one."
could we do that in a followup since I think it will be a larger change? i could add it here though, just will be a big PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok with the follow up. Do we have a ticket for it?
c37cb18
to
697f202
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just answered to some open discussion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usage of Optional.get()
should be avoided.
diff = refreshedCatalog.get().getCatalogDiff(); | ||
connection.setBreakingChange(refreshedCatalog.get().getBreakingChange()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling .get()
on an Optional.empty()
will throw a NoSuchElementException
. What is the expected behavior here if refreshedCatalog = Optional.empty();
(from line 333) was the path taken?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on line 339 we have if(refreshedCatalog.isPresent())
so all of this is nested under that - I think that should make this ok?
* master: (38 commits) New Source: Gridly (#18342) 🎉 New Source: Alpha Vantage (#18320) ci_integration_test.sh: cut GITHUB_STEP_SUMMARY (#18895) 🎉 New Source: Datadog [python cdk] (#18150) Hide Reject all button in consent dialog (#18596) feat: add doc url to track event (#18690) fix: install java in oss catalog deploy action (#18887) [CI] Speed up check_images_exist (#18873) Extract open API (#18879) Remove unused interfaces (#18880) add action for deploying oss connector catalog to GCS (#18633) feat: generate full connector catalog json (#18562) Add unsupported_protocol_version column to Connection (#18876) Extract OAuth API (#18818) update images to have non-transparent background (#18874) DiscoverSchema endpoints calculates diff and breaking change (#18571) Validate protocol version on connector update (#18639) Bmoric/extract notification api (#18812) Show version and changelog status for affected connectors (#18845) Bmoric/extract logs api (#18621) ...
Catalog diff calculation is moved to the DiscoverSchema endpoint. If a breaking change is detected, the connection is updated to have breakingChange=true
API changes: