Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DiscoverSchema endpoints calculates diff and breaking change #18571

Merged
merged 12 commits into from
Nov 2, 2022
9 changes: 9 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2822,6 +2822,9 @@ components:
properties:
sourceId:
$ref: "#/components/schemas/SourceId"
connectionId:
type: string
format: uuid
disable_cache:
type: boolean
SourceUpdate:
Expand Down Expand Up @@ -2883,6 +2886,10 @@ components:
catalogId:
type: string
format: uuid
catalogDiff:
$ref: "#/components/schemas/CatalogDiff"
breakingChange:
type: boolean
SourceSearch:
type: object
properties:
Expand Down Expand Up @@ -3375,6 +3382,8 @@ components:
type: boolean
nonBreakingChangesPreference:
$ref: "#/components/schemas/NonBreakingChangesPreference"
breakingChange:
type: boolean
WebBackendConnectionUpdate:
type: object
description: Used to apply a patch-style update to a connection, which means that null properties remain unchanged
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
jobPersistence,
configs.getWorkerEnvironment(),
configs.getLogConfigs(),
eventRunner);
eventRunner,
connectionsHandler);

final DbMigrationHandler dbMigrationHandler = new DbMigrationHandler(configsDatabase, configsFlyway, jobsDatabase, jobsFlyway);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ public ConfigurationApi(final ConfigRepository configRepository,

final WorkspaceHelper workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence);

connectionsHandler = new ConnectionsHandler(
configRepository,
workspaceHelper,
trackingClient,
eventRunner);

schedulerHandler = new SchedulerHandler(
configRepository,
secretsRepositoryReader,
Expand All @@ -192,14 +198,10 @@ public ConfigurationApi(final ConfigRepository configRepository,
jobPersistence,
workerEnvironment,
logConfigs,
eventRunner);
eventRunner,
connectionsHandler);

stateHandler = new StateHandler(statePersistence);
connectionsHandler = new ConnectionsHandler(
configRepository,
workspaceHelper,
trackingClient,
eventRunner);
sourceHandler = new SourceHandler(
configRepository,
secretsRepositoryReader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ private static void applyPatchToStandardSync(final StandardSync sync, final Conn
if (patch.getGeography() != null) {
sync.setGeography(ApiPojoConverters.toPersistenceGeography(patch.getGeography()));
}

if (patch.getBreakingChange() != null) {
sync.setBreakingChange(patch.getBreakingChange());
}
}

private void validateConnectionPatch(final WorkspaceHelper workspaceHelper, final StandardSync persistedSync, final ConnectionUpdate patch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@
import com.google.common.hash.Hashing;
import io.airbyte.api.model.generated.AdvancedAuth;
import io.airbyte.api.model.generated.AuthSpecification;
import io.airbyte.api.model.generated.CatalogDiff;
import io.airbyte.api.model.generated.CheckConnectionRead;
import io.airbyte.api.model.generated.CheckConnectionRead.StatusEnum;
import io.airbyte.api.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.model.generated.ConnectionUpdate;
import io.airbyte.api.model.generated.DestinationCoreConfig;
import io.airbyte.api.model.generated.DestinationDefinitionIdWithWorkspaceId;
import io.airbyte.api.model.generated.DestinationDefinitionSpecificationRead;
import io.airbyte.api.model.generated.DestinationIdRequestBody;
import io.airbyte.api.model.generated.DestinationSyncMode;
import io.airbyte.api.model.generated.DestinationUpdate;
import io.airbyte.api.model.generated.FieldTransform;
import io.airbyte.api.model.generated.JobConfigType;
import io.airbyte.api.model.generated.JobIdRequestBody;
import io.airbyte.api.model.generated.JobInfoRead;
Expand All @@ -31,6 +34,8 @@
import io.airbyte.api.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.api.model.generated.SourceIdRequestBody;
import io.airbyte.api.model.generated.SourceUpdate;
import io.airbyte.api.model.generated.StreamTransform;
import io.airbyte.api.model.generated.StreamTransform.TransformTypeEnum;
import io.airbyte.api.model.generated.SynchronousJobRead;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.enums.Enums;
Expand Down Expand Up @@ -70,6 +75,7 @@
import java.util.ArrayList;
import java.util.Optional;
import java.util.UUID;
import javax.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand All @@ -80,6 +86,7 @@ public class SchedulerHandler {
private static final ImmutableSet<ErrorCode> VALUE_CONFLICT_EXCEPTION_ERROR_CODE_SET =
ImmutableSet.of(ErrorCode.WORKFLOW_DELETED, ErrorCode.WORKFLOW_RUNNING);

private final ConnectionsHandler connectionsHandler;
private final ConfigRepository configRepository;
private final SecretsRepositoryWriter secretsRepositoryWriter;
private final SynchronousSchedulerClient synchronousSchedulerClient;
Expand All @@ -96,7 +103,8 @@ public SchedulerHandler(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfigs,
final EventRunner eventRunner) {
final EventRunner eventRunner,
final ConnectionsHandler connectionsHandler) {
this(
configRepository,
secretsRepositoryWriter,
Expand All @@ -105,7 +113,8 @@ public SchedulerHandler(final ConfigRepository configRepository,
new JsonSchemaValidator(),
jobPersistence,
eventRunner,
new JobConverter(workerEnvironment, logConfigs));
new JobConverter(workerEnvironment, logConfigs),
connectionsHandler);
}

@VisibleForTesting
Expand All @@ -116,7 +125,8 @@ public SchedulerHandler(final ConfigRepository configRepository,
final JsonSchemaValidator jsonSchemaValidator,
final JobPersistence jobPersistence,
final EventRunner eventRunner,
final JobConverter jobConverter) {
final JobConverter jobConverter,
final ConnectionsHandler connectionsHandler) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for the scope of this review but to keep in mind for later: I think discoverSchemaForSourceFromSourceId should be move to the SourceHandler. We should get rid of the scheduler handler at some points (we don't have any scheduler anymore).

this.configRepository = configRepository;
this.secretsRepositoryWriter = secretsRepositoryWriter;
this.synchronousSchedulerClient = synchronousSchedulerClient;
Expand All @@ -125,6 +135,7 @@ public SchedulerHandler(final ConfigRepository configRepository,
this.jobPersistence = jobPersistence;
this.eventRunner = eventRunner;
this.jobConverter = jobConverter;
this.connectionsHandler = connectionsHandler;
}

public CheckConnectionRead checkSourceConnectionFromSourceId(final SourceIdRequestBody sourceIdRequestBody)
Expand Down Expand Up @@ -230,7 +241,13 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source
if (currentCatalog.isEmpty() || bustActorCatalogCache) {
final SynchronousResponse<UUID> persistedCatalogId =
synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName, connectorVersion, new Version(sourceDef.getProtocolVersion()));
return retrieveDiscoveredSchema(persistedCatalogId);
final SourceDiscoverSchemaRead discoveredSchema = retrieveDiscoveredSchema(persistedCatalogId);

if (discoverSchemaRequestBody.getConnectionId() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When can this be null. Should we log an error if so?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this endpoint is called when connections are created in order to discover schemas, and in this case we don't need to pass in the connection id since there is not an existing sync catalog to compare anything against

discoveredSchemaWithCatalogDiff(discoveredSchema, discoverSchemaRequestBody);
}

return discoveredSchema;
}
final AirbyteCatalog airbyteCatalog = Jsons.object(currentCatalog.get().getCatalog(), AirbyteCatalog.class);
final SynchronousJobRead emptyJob = new SynchronousJobRead()
Expand Down Expand Up @@ -339,6 +356,22 @@ public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOE
return submitCancellationToWorker(jobIdRequestBody.getId());
}

private void discoveredSchemaWithCatalogDiff(SourceDiscoverSchemaRead discoveredSchema, SourceDiscoverSchemaRequestBody discoverSchemaRequestBody)
throws JsonValidationException, ConfigNotFoundException, IOException {
final Optional<io.airbyte.api.model.generated.AirbyteCatalog> catalogUsedToMakeConfiguredCatalog = connectionsHandler
.getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId());
final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog =
connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId()).getSyncCatalog();
CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this review: The getDiff could part of the CatalogHelper/. It doesn't interact with the persistenceLayer. The goal would be to avoid having an handler to depends on another one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the use case for having a catalog fallback? This feels like something that could be worth documenting here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just copy pasted this to move it from WebBackendConnectionsHandler, so I'm not 100% sure. I think it might be in case we can't find a source catalog for a particular connection, then we fall back on the configured airbyte catalog that exists for that connection, but I'm not sure about the case where that would actually happen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, can we move this code to some shared helper somewhere? Spreading unclear logic feels like a trap for our future selves.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benmoriceau made a comment above to move getDiff to the CatalogsHelper:

"Not for this review: The getDiff could part of the CatalogHelper/. It doesn't interact with the persistenceLayer. The goal would be to avoid having an handler to depends on another one."

could we do that in a followup since I think it will be a larger change? i could add it here though, just will be a big PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with the follow up. Do we have a ticket for it?

CatalogConverter.toProtocol(currentAirbyteCatalog));
boolean containsBreakingChange = containsBreakingChange(diff);
ConnectionUpdate updateObject =
new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(discoverSchemaRequestBody.getConnectionId());
connectionsHandler.updateConnection(updateObject);
discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange);

}

private CheckConnectionRead reportConnectionStatus(final SynchronousResponse<StandardCheckConnectionOutput> response) {
final CheckConnectionRead checkConnectionRead = new CheckConnectionRead()
.jobInfo(jobConverter.getSynchronousJobRead(response));
Expand Down Expand Up @@ -409,4 +442,19 @@ private JobInfoRead readJobFromResult(final ManualOperationResult manualOperatio
return jobConverter.getJobInfoRead(job);
}

private boolean containsBreakingChange(final CatalogDiff diff) {
for (StreamTransform streamTransform : diff.getTransforms()) {
if (streamTransform.getTransformType() != TransformTypeEnum.UPDATE_STREAM) {
continue;
}

boolean anyBreakingFieldTransforms = streamTransform.getUpdateStream().stream().anyMatch(FieldTransform::getBreaking);
if (anyBreakingFieldTransforms) {
return true;
}
}

return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
*/
final Optional<SourceDiscoverSchemaRead> refreshedCatalog;
if (MoreBooleans.isTruthy(webBackendConnectionRequestBody.getWithRefreshedCatalog())) {
refreshedCatalog = getRefreshedSchema(connection.getSourceId());
refreshedCatalog = getRefreshedSchema(connection.getSourceId(), connection.getConnectionId());
} else {
refreshedCatalog = Optional.empty();
}
Expand All @@ -351,9 +351,8 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
* but was present at time of configuration will appear in the diff as an added stream which is
* confusing. We need to figure out why source_catalog_id is not always populated in the db.
*/
diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(configuredCatalog), refreshedCatalog.get().getCatalog(),
CatalogConverter.toProtocol(configuredCatalog));

diff = refreshedCatalog.get().getCatalogDiff();
connection.setBreakingChange(refreshedCatalog.get().getBreakingChange());
Comment on lines +354 to +355
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling .get() on an Optional.empty() will throw a NoSuchElementException. What is the expected behavior here if refreshedCatalog = Optional.empty(); (from line 333) was the path taken?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on line 339 we have if(refreshedCatalog.isPresent()) so all of this is nested under that - I think that should make this ok?

} else if (catalogUsedToMakeConfiguredCatalog.isPresent()) {
// reconstructs a full picture of the full schema at the time the catalog was configured.
syncCatalog = updateSchemaWithDiscovery(configuredCatalog, catalogUsedToMakeConfiguredCatalog.get());
Expand All @@ -370,12 +369,14 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
return buildWebBackendConnectionRead(connection, currentSourceCatalogId).catalogDiff(diff);
}

private Optional<SourceDiscoverSchemaRead> getRefreshedSchema(final UUID sourceId)
private Optional<SourceDiscoverSchemaRead> getRefreshedSchema(final UUID sourceId, final UUID connectionId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since this function is actually going to fetch the schema everytime, feels like refreshSchema is more explicit. A bit out-of-scope, I'll probably rename this at some point.

throws JsonValidationException, ConfigNotFoundException, IOException {
final SourceDiscoverSchemaRequestBody discoverSchemaReadReq = new SourceDiscoverSchemaRequestBody()
.sourceId(sourceId)
.disableCache(true);
return Optional.ofNullable(schedulerHandler.discoverSchemaForSourceFromSourceId(discoverSchemaReadReq));
.disableCache(true)
.connectionId(connectionId);
SourceDiscoverSchemaRead schemaRead = schedulerHandler.discoverSchemaForSourceFromSourceId(discoverSchemaReadReq);
return Optional.ofNullable(schemaRead);
}

/**
Expand Down Expand Up @@ -480,10 +481,7 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
connectionRead = connectionsHandler.updateConnection(connectionPatch);

// detect if any streams need to be reset based on the patch and initial catalog, if so, reset them
// and fetch
// an up-to-date connectionRead
connectionRead = resetStreamsIfNeeded(webBackendConnectionPatch, oldConfiguredCatalog, connectionRead);

resetStreamsIfNeeded(webBackendConnectionPatch, oldConfiguredCatalog, connectionRead);
/*
* This catalog represents the full catalog that was used to create the configured catalog. It will
* have all streams that were present at the time. It will have no configuration set.
Expand All @@ -504,9 +502,9 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
* Given a fully updated connection, check for a diff between the old catalog and the updated
* catalog to see if any streams need to be reset.
*/
private ConnectionRead resetStreamsIfNeeded(final WebBackendConnectionUpdate webBackendConnectionPatch,
final ConfiguredAirbyteCatalog oldConfiguredCatalog,
final ConnectionRead updatedConnectionRead)
private void resetStreamsIfNeeded(final WebBackendConnectionUpdate webBackendConnectionPatch,
final ConfiguredAirbyteCatalog oldConfiguredCatalog,
final ConnectionRead updatedConnectionRead)
throws IOException, JsonValidationException, ConfigNotFoundException {

final UUID connectionId = webBackendConnectionPatch.getConnectionId();
Expand Down Expand Up @@ -535,13 +533,8 @@ private ConnectionRead resetStreamsIfNeeded(final WebBackendConnectionUpdate web
eventRunner.resetConnection(
connectionId,
streamsToReset, true);

// return updated connectionRead after reset
return connectionsHandler.getConnection(connectionId);
}
}
// if no reset was necessary, return the connectionRead without changes
return updatedConnectionRead;
}

private List<UUID> createOperations(final WebBackendConnectionCreate webBackendConnectionCreate)
Expand Down
Loading