Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DiscoverSchema endpoints calculates diff and breaking change #18571

Merged
merged 12 commits into from
Nov 2, 2022

Conversation

alovew
Copy link
Contributor

@alovew alovew commented Oct 27, 2022

Catalog diff calculation is moved to the DiscoverSchema endpoint. If a breaking change is detected, the connection is updated to have breakingChange=true

API changes:

  • SourceDiscoverSchemaRequestBody, the input to the DiscoverSchema endpoint, now has a connectionId
  • SourceDiscoverSchemaRead, the response from the DiscoverSchema endpoint, now returns a catalogDiff
  • The ConnectionUpdate object now has breakingChange, so connections can be updated with this data

@github-actions github-actions bot added area/api Related to the api area/documentation Improvements or additions to documentation area/platform issues related to the platform area/server labels Oct 27, 2022
@alovew alovew temporarily deployed to more-secrets October 27, 2022 19:20 Inactive
@alovew alovew temporarily deployed to more-secrets October 27, 2022 19:25 Inactive
@alovew alovew temporarily deployed to more-secrets October 27, 2022 19:31 Inactive
@alovew
Copy link
Contributor Author

alovew commented Oct 27, 2022

Currently writing tests for SchedulerHandler

@@ -202,14 +208,9 @@ public ConfigurationApi(final ConfigRepository configRepository,
jobPersistence,
workerEnvironment,
logConfigs,
eventRunner);
eventRunner, connectionsHandler);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, this addition should be on its own line.

return retrieveDiscoveredSchema(persistedCatalogId);
final SourceDiscoverSchemaRead discoveredSchema = retrieveDiscoveredSchema(persistedCatalogId);

if (discoverSchemaRequestBody.getConnectionId() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When can this be null. Should we log an error if so?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this endpoint is called when connections are created in order to discover schemas, and in this case we don't need to pass in the connection id since there is not an existing sync catalog to compare anything against

if (streamTransform.getTransformType() != TransformTypeEnum.UPDATE_STREAM) {
return;
}
streamTransform.getUpdateStream().stream().forEach(fieldTransform -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we should consider takeWhile here instead of foreach in order to be able to breakout of the loop as soon as possible. I wonder if it can have an impact on big catalogs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure exactly how this would work. I think takeWhile continues to operate on each item until it reaches something that does not match the predicate, and then it will not operate on that field, so I'm not sure we could actually set isBreaking to true in that case..but maybe you're thinking about a different way of doing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just updated it to use anyMatch instead which is probably cleaner and I think does break out early

@@ -409,4 +434,19 @@ private JobInfoRead readJobFromResult(final ManualOperationResult manualOperatio
return jobConverter.getJobInfoRead(job);
}

private boolean containsBreakingChange(final CatalogDiff diff) {
AtomicBoolean isBreaking = new AtomicBoolean(false);
diff.getTransforms().stream().forEach(streamTransform -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, this looks like more like a reduce operation rather than a map. In Java it would be written like

.collect(false,
(acc, value) -> acc = acc || value.getBreaking(),
(left, right) -> left = left || right
)

I find it more explicit about what this is doing but I don't know what is the opinion of the rest of the team.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this more confusing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if that's better...

diff.getTransforms().stream()
    .filter(streamTransform -> streamTransform.getTransformType() == TransformTypeEnum.UPDATE_STREAM)
    .flatMap(streamTransform -> streamTransform.getUpdateStream().stream().map(fieldTransform::getBreaking))
    .anyMatch(Boolean::booleanValue)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks better to me. What I would like to avoid is to have state within the function use in the stream API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benmoriceau why do we want to avoid that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generally think of stream notation as a more functional approach, in that paradigm having a mutable states/side effects would generally be something to avoid.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What jimmy is saying. Ideally when using this we need to be able to take the anonymous function in the foreach call and move it to its own method which is not the case here. I would personally prefer using a for loop like for (A a: as) if we are not stateless in the anonymous functions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synced up on Zoom about that

@alovew alovew temporarily deployed to more-secrets October 27, 2022 23:08 Inactive
@alovew alovew temporarily deployed to more-secrets October 27, 2022 23:30 Inactive
@alovew alovew requested a review from benmoriceau October 27, 2022 23:48
@alovew alovew temporarily deployed to more-secrets October 27, 2022 23:50 Inactive
@alovew alovew temporarily deployed to more-secrets October 28, 2022 00:37 Inactive
Copy link
Contributor

@gosusnp gosusnp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, some nits and questions.

@@ -368,12 +367,14 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
return buildWebBackendConnectionRead(connection, currentSourceCatalogId).catalogDiff(diff);
}

private Optional<SourceDiscoverSchemaRead> getRefreshedSchema(final UUID sourceId)
private Optional<SourceDiscoverSchemaRead> getRefreshedSchema(final UUID sourceId, final UUID connectionId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since this function is actually going to fetch the schema everytime, feels like refreshSchema is more explicit. A bit out-of-scope, I'll probably rename this at some point.

"transformType" : "add_stream",
"updateStream" : [ {
"updateFieldSchema" : { },
"fieldName" : [ "fieldName", "fieldName" ],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discovering the json here, why do we repeat "fieldName" twice here?

@alovew
Copy link
Contributor Author

alovew commented Oct 28, 2022

@gosusnp I made a somewhat significant change after you approved - the SourceDiscoverSchemaRead object now has a breakingChange field, so we can set that on the connectionRead object in the WebBackendConnectionsHandler without refetching the connection object after updating it in the DiscoverSchema endpoint

@alovew alovew temporarily deployed to more-secrets October 28, 2022 20:18 Inactive
@alovew alovew temporarily deployed to more-secrets October 28, 2022 20:26 Inactive
Copy link
Contributor

@benmoriceau benmoriceau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall. got some comments about code organization that might not be needed for this review.

@@ -409,4 +434,19 @@ private JobInfoRead readJobFromResult(final ManualOperationResult manualOperatio
return jobConverter.getJobInfoRead(job);
}

private boolean containsBreakingChange(final CatalogDiff diff) {
AtomicBoolean isBreaking = new AtomicBoolean(false);
diff.getTransforms().stream().forEach(streamTransform -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks better to me. What I would like to avoid is to have state within the function use in the stream API.

throws JsonValidationException, ConfigNotFoundException, IOException {
final Optional<io.airbyte.api.model.generated.AirbyteCatalog> catalogUsedToMakeConfiguredCatalog = connectionsHandler
.getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId());
io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit should this be final (Same for the diff)? If you had to reset your intelliJ workspace recently , it is likely that the save actions plugin configuration has been lost.

@@ -116,7 +124,8 @@ public SchedulerHandler(final ConfigRepository configRepository,
final JsonSchemaValidator jsonSchemaValidator,
final JobPersistence jobPersistence,
final EventRunner eventRunner,
final JobConverter jobConverter) {
final JobConverter jobConverter,
final ConnectionsHandler connectionsHandler) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for the scope of this review but to keep in mind for later: I think discoverSchemaForSourceFromSourceId should be move to the SourceHandler. We should get rid of the scheduler handler at some points (we don't have any scheduler anymore).

.getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId());
io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog =
connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId()).getSyncCatalog();
CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this review: The getDiff could part of the CatalogHelper/. It doesn't interact with the persistenceLayer. The goal would be to avoid having an handler to depends on another one.

@alovew alovew temporarily deployed to more-secrets October 28, 2022 21:42 Inactive
.getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId());
io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog =
connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId()).getSyncCatalog();
CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the use case for having a catalog fallback? This feels like something that could be worth documenting here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just copy pasted this to move it from WebBackendConnectionsHandler, so I'm not 100% sure. I think it might be in case we can't find a source catalog for a particular connection, then we fall back on the configured airbyte catalog that exists for that connection, but I'm not sure about the case where that would actually happen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, can we move this code to some shared helper somewhere? Spreading unclear logic feels like a trap for our future selves.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benmoriceau made a comment above to move getDiff to the CatalogsHelper:

"Not for this review: The getDiff could part of the CatalogHelper/. It doesn't interact with the persistenceLayer. The goal would be to avoid having an handler to depends on another one."

could we do that in a followup since I think it will be a larger change? i could add it here though, just will be a big PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with the follow up. Do we have a ticket for it?

@alovew alovew temporarily deployed to more-secrets October 28, 2022 23:13 Inactive
@alovew alovew force-pushed the anne/discover-schema-endpoint branch from c37cb18 to 697f202 Compare October 31, 2022 20:11
@alovew alovew temporarily deployed to more-secrets October 31, 2022 20:31 Inactive
@alovew alovew requested a review from gosusnp October 31, 2022 22:45
Copy link
Contributor

@benmoriceau benmoriceau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just answered to some open discussion

@alovew alovew temporarily deployed to more-secrets November 1, 2022 18:54 Inactive
@alovew alovew requested a review from benmoriceau November 1, 2022 19:39
Copy link
Member

@colesnodgrass colesnodgrass left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usage of Optional.get() should be avoided.

Comment on lines +354 to +355
diff = refreshedCatalog.get().getCatalogDiff();
connection.setBreakingChange(refreshedCatalog.get().getBreakingChange());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling .get() on an Optional.empty() will throw a NoSuchElementException. What is the expected behavior here if refreshedCatalog = Optional.empty(); (from line 333) was the path taken?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on line 339 we have if(refreshedCatalog.isPresent()) so all of this is nested under that - I think that should make this ok?

@alovew alovew requested a review from colesnodgrass November 1, 2022 22:45
@alovew alovew temporarily deployed to more-secrets November 2, 2022 19:50 Inactive
@alovew alovew temporarily deployed to more-secrets November 2, 2022 20:28 Inactive
@alovew alovew merged commit d26e5bc into master Nov 2, 2022
@alovew alovew deleted the anne/discover-schema-endpoint branch November 2, 2022 21:10
letiescanciano added a commit that referenced this pull request Nov 3, 2022
* master: (38 commits)
  New Source: Gridly (#18342)
  🎉 New Source: Alpha Vantage (#18320)
  ci_integration_test.sh: cut GITHUB_STEP_SUMMARY (#18895)
  🎉 New Source: Datadog [python cdk] (#18150)
  Hide Reject all button in consent dialog (#18596)
  feat: add doc url to track event (#18690)
  fix: install java in oss catalog deploy action (#18887)
  [CI] Speed up check_images_exist (#18873)
  Extract open API (#18879)
  Remove unused interfaces (#18880)
  add action for deploying oss connector catalog to GCS (#18633)
  feat: generate full connector catalog json (#18562)
  Add unsupported_protocol_version column to Connection (#18876)
  Extract OAuth API (#18818)
  update images to have non-transparent background (#18874)
  DiscoverSchema endpoints calculates diff and breaking change (#18571)
  Validate protocol version on connector update (#18639)
  Bmoric/extract notification api (#18812)
  Show version and changelog status for affected connectors (#18845)
  Bmoric/extract logs api (#18621)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/api Related to the api area/documentation Improvements or additions to documentation area/platform issues related to the platform area/server
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants