-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DiscoverSchema endpoints calculates diff and breaking change #18571
Changes from 11 commits
dc61e07
80b5767
74353b8
0855b5f
73d8d01
f47d0ae
feb2b52
2e2ddde
badd99c
697f202
9b47e6a
312e792
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,15 +11,18 @@ | |
import com.google.common.hash.Hashing; | ||
import io.airbyte.api.model.generated.AdvancedAuth; | ||
import io.airbyte.api.model.generated.AuthSpecification; | ||
import io.airbyte.api.model.generated.CatalogDiff; | ||
import io.airbyte.api.model.generated.CheckConnectionRead; | ||
import io.airbyte.api.model.generated.CheckConnectionRead.StatusEnum; | ||
import io.airbyte.api.model.generated.ConnectionIdRequestBody; | ||
import io.airbyte.api.model.generated.ConnectionUpdate; | ||
import io.airbyte.api.model.generated.DestinationCoreConfig; | ||
import io.airbyte.api.model.generated.DestinationDefinitionIdWithWorkspaceId; | ||
import io.airbyte.api.model.generated.DestinationDefinitionSpecificationRead; | ||
import io.airbyte.api.model.generated.DestinationIdRequestBody; | ||
import io.airbyte.api.model.generated.DestinationSyncMode; | ||
import io.airbyte.api.model.generated.DestinationUpdate; | ||
import io.airbyte.api.model.generated.FieldTransform; | ||
import io.airbyte.api.model.generated.JobConfigType; | ||
import io.airbyte.api.model.generated.JobIdRequestBody; | ||
import io.airbyte.api.model.generated.JobInfoRead; | ||
|
@@ -31,6 +34,8 @@ | |
import io.airbyte.api.model.generated.SourceDiscoverSchemaRequestBody; | ||
import io.airbyte.api.model.generated.SourceIdRequestBody; | ||
import io.airbyte.api.model.generated.SourceUpdate; | ||
import io.airbyte.api.model.generated.StreamTransform; | ||
import io.airbyte.api.model.generated.StreamTransform.TransformTypeEnum; | ||
import io.airbyte.api.model.generated.SynchronousJobRead; | ||
import io.airbyte.commons.docker.DockerUtils; | ||
import io.airbyte.commons.enums.Enums; | ||
|
@@ -70,6 +75,7 @@ | |
import java.util.ArrayList; | ||
import java.util.Optional; | ||
import java.util.UUID; | ||
import javax.validation.constraints.NotNull; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
@Slf4j | ||
|
@@ -80,6 +86,7 @@ public class SchedulerHandler { | |
private static final ImmutableSet<ErrorCode> VALUE_CONFLICT_EXCEPTION_ERROR_CODE_SET = | ||
ImmutableSet.of(ErrorCode.WORKFLOW_DELETED, ErrorCode.WORKFLOW_RUNNING); | ||
|
||
private final ConnectionsHandler connectionsHandler; | ||
private final ConfigRepository configRepository; | ||
private final SecretsRepositoryWriter secretsRepositoryWriter; | ||
private final SynchronousSchedulerClient synchronousSchedulerClient; | ||
|
@@ -96,7 +103,8 @@ public SchedulerHandler(final ConfigRepository configRepository, | |
final JobPersistence jobPersistence, | ||
final WorkerEnvironment workerEnvironment, | ||
final LogConfigs logConfigs, | ||
final EventRunner eventRunner) { | ||
final EventRunner eventRunner, | ||
final ConnectionsHandler connectionsHandler) { | ||
this( | ||
configRepository, | ||
secretsRepositoryWriter, | ||
|
@@ -105,7 +113,8 @@ public SchedulerHandler(final ConfigRepository configRepository, | |
new JsonSchemaValidator(), | ||
jobPersistence, | ||
eventRunner, | ||
new JobConverter(workerEnvironment, logConfigs)); | ||
new JobConverter(workerEnvironment, logConfigs), | ||
connectionsHandler); | ||
} | ||
|
||
@VisibleForTesting | ||
|
@@ -116,7 +125,8 @@ public SchedulerHandler(final ConfigRepository configRepository, | |
final JsonSchemaValidator jsonSchemaValidator, | ||
final JobPersistence jobPersistence, | ||
final EventRunner eventRunner, | ||
final JobConverter jobConverter) { | ||
final JobConverter jobConverter, | ||
final ConnectionsHandler connectionsHandler) { | ||
this.configRepository = configRepository; | ||
this.secretsRepositoryWriter = secretsRepositoryWriter; | ||
this.synchronousSchedulerClient = synchronousSchedulerClient; | ||
|
@@ -125,6 +135,7 @@ public SchedulerHandler(final ConfigRepository configRepository, | |
this.jobPersistence = jobPersistence; | ||
this.eventRunner = eventRunner; | ||
this.jobConverter = jobConverter; | ||
this.connectionsHandler = connectionsHandler; | ||
} | ||
|
||
public CheckConnectionRead checkSourceConnectionFromSourceId(final SourceIdRequestBody sourceIdRequestBody) | ||
|
@@ -230,7 +241,13 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source | |
if (currentCatalog.isEmpty() || bustActorCatalogCache) { | ||
final SynchronousResponse<UUID> persistedCatalogId = | ||
synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName, connectorVersion, new Version(sourceDef.getProtocolVersion())); | ||
return retrieveDiscoveredSchema(persistedCatalogId); | ||
final SourceDiscoverSchemaRead discoveredSchema = retrieveDiscoveredSchema(persistedCatalogId); | ||
|
||
if (discoverSchemaRequestBody.getConnectionId() != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When can this be null. Should we log an error if so? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this endpoint is called when connections are created in order to discover schemas, and in this case we don't need to pass in the connection id since there is not an existing sync catalog to compare anything against
gosusnp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
discoveredSchemaWithCatalogDiff(discoveredSchema, discoverSchemaRequestBody); | ||
} | ||
|
||
return discoveredSchema; | ||
} | ||
final AirbyteCatalog airbyteCatalog = Jsons.object(currentCatalog.get().getCatalog(), AirbyteCatalog.class); | ||
final SynchronousJobRead emptyJob = new SynchronousJobRead() | ||
|
@@ -339,6 +356,22 @@ public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOE | |
return submitCancellationToWorker(jobIdRequestBody.getId()); | ||
} | ||
|
||
private void discoveredSchemaWithCatalogDiff(SourceDiscoverSchemaRead discoveredSchema, SourceDiscoverSchemaRequestBody discoverSchemaRequestBody) | ||
throws JsonValidationException, ConfigNotFoundException, IOException { | ||
final Optional<io.airbyte.api.model.generated.AirbyteCatalog> catalogUsedToMakeConfiguredCatalog = connectionsHandler | ||
.getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId()); | ||
final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog = | ||
connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId()).getSyncCatalog(); | ||
CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not for this review: The getDiff could part of the CatalogHelper/. It doesn't interact with the persistenceLayer. The goal would be to avoid having an handler to depends on another one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the use case for having a catalog fallback? This feels like something that could be worth documenting here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just copy pasted this to move it from WebBackendConnectionsHandler, so I'm not 100% sure. I think it might be in case we can't find a source catalog for a particular connection, then we fall back on the configured airbyte catalog that exists for that connection, but I'm not sure about the case where that would actually happen. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this case, can we move this code to some shared helper somewhere? Spreading unclear logic feels like a trap for our future selves. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @benmoriceau made a comment above to move getDiff to the CatalogsHelper: "Not for this review: The getDiff could part of the CatalogHelper/. It doesn't interact with the persistenceLayer. The goal would be to avoid having an handler to depends on another one." could we do that in a followup since I think it will be a larger change? i could add it here though, just will be a big PR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm ok with the follow up. Do we have a ticket for it? |
||
CatalogConverter.toProtocol(currentAirbyteCatalog)); | ||
boolean containsBreakingChange = containsBreakingChange(diff); | ||
ConnectionUpdate updateObject = | ||
new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(discoverSchemaRequestBody.getConnectionId()); | ||
connectionsHandler.updateConnection(updateObject); | ||
discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange); | ||
|
||
} | ||
|
||
private CheckConnectionRead reportConnectionStatus(final SynchronousResponse<StandardCheckConnectionOutput> response) { | ||
final CheckConnectionRead checkConnectionRead = new CheckConnectionRead() | ||
.jobInfo(jobConverter.getSynchronousJobRead(response)); | ||
|
@@ -409,4 +442,19 @@ private JobInfoRead readJobFromResult(final ManualOperationResult manualOperatio | |
return jobConverter.getJobInfoRead(job); | ||
} | ||
|
||
private boolean containsBreakingChange(final CatalogDiff diff) { | ||
for (StreamTransform streamTransform : diff.getTransforms()) { | ||
if (streamTransform.getTransformType() != TransformTypeEnum.UPDATE_STREAM) { | ||
continue; | ||
} | ||
|
||
boolean anyBreakingFieldTransforms = streamTransform.getUpdateStream().stream().anyMatch(FieldTransform::getBreaking); | ||
if (anyBreakingFieldTransforms) { | ||
return true; | ||
} | ||
} | ||
|
||
return false; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -328,7 +328,7 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti | |
*/ | ||
final Optional<SourceDiscoverSchemaRead> refreshedCatalog; | ||
if (MoreBooleans.isTruthy(webBackendConnectionRequestBody.getWithRefreshedCatalog())) { | ||
refreshedCatalog = getRefreshedSchema(connection.getSourceId()); | ||
refreshedCatalog = getRefreshedSchema(connection.getSourceId(), connection.getConnectionId()); | ||
} else { | ||
refreshedCatalog = Optional.empty(); | ||
} | ||
|
@@ -351,9 +351,8 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti | |
* but was present at time of configuration will appear in the diff as an added stream which is | ||
* confusing. We need to figure out why source_catalog_id is not always populated in the db. | ||
*/ | ||
diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(configuredCatalog), refreshedCatalog.get().getCatalog(), | ||
CatalogConverter.toProtocol(configuredCatalog)); | ||
|
||
diff = refreshedCatalog.get().getCatalogDiff(); | ||
connection.setBreakingChange(refreshedCatalog.get().getBreakingChange()); | ||
Comment on lines
+354
to
+355
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. on line 339 we have |
||
} else if (catalogUsedToMakeConfiguredCatalog.isPresent()) { | ||
// reconstructs a full picture of the full schema at the time the catalog was configured. | ||
syncCatalog = updateSchemaWithDiscovery(configuredCatalog, catalogUsedToMakeConfiguredCatalog.get()); | ||
|
@@ -370,12 +369,14 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti | |
return buildWebBackendConnectionRead(connection, currentSourceCatalogId).catalogDiff(diff); | ||
} | ||
|
||
private Optional<SourceDiscoverSchemaRead> getRefreshedSchema(final UUID sourceId) | ||
private Optional<SourceDiscoverSchemaRead> getRefreshedSchema(final UUID sourceId, final UUID connectionId) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: since this function is actually going to fetch the schema everytime, feels like |
||
throws JsonValidationException, ConfigNotFoundException, IOException { | ||
final SourceDiscoverSchemaRequestBody discoverSchemaReadReq = new SourceDiscoverSchemaRequestBody() | ||
.sourceId(sourceId) | ||
.disableCache(true); | ||
return Optional.ofNullable(schedulerHandler.discoverSchemaForSourceFromSourceId(discoverSchemaReadReq)); | ||
.disableCache(true) | ||
.connectionId(connectionId); | ||
SourceDiscoverSchemaRead schemaRead = schedulerHandler.discoverSchemaForSourceFromSourceId(discoverSchemaReadReq); | ||
return Optional.ofNullable(schemaRead); | ||
} | ||
|
||
/** | ||
|
@@ -480,10 +481,7 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne | |
connectionRead = connectionsHandler.updateConnection(connectionPatch); | ||
|
||
// detect if any streams need to be reset based on the patch and initial catalog, if so, reset them | ||
// and fetch | ||
// an up-to-date connectionRead | ||
connectionRead = resetStreamsIfNeeded(webBackendConnectionPatch, oldConfiguredCatalog, connectionRead); | ||
|
||
resetStreamsIfNeeded(webBackendConnectionPatch, oldConfiguredCatalog, connectionRead); | ||
/* | ||
* This catalog represents the full catalog that was used to create the configured catalog. It will | ||
* have all streams that were present at the time. It will have no configuration set. | ||
|
@@ -504,9 +502,9 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne | |
* Given a fully updated connection, check for a diff between the old catalog and the updated | ||
gosusnp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* catalog to see if any streams need to be reset. | ||
*/ | ||
private ConnectionRead resetStreamsIfNeeded(final WebBackendConnectionUpdate webBackendConnectionPatch, | ||
final ConfiguredAirbyteCatalog oldConfiguredCatalog, | ||
final ConnectionRead updatedConnectionRead) | ||
private void resetStreamsIfNeeded(final WebBackendConnectionUpdate webBackendConnectionPatch, | ||
final ConfiguredAirbyteCatalog oldConfiguredCatalog, | ||
final ConnectionRead updatedConnectionRead) | ||
throws IOException, JsonValidationException, ConfigNotFoundException { | ||
|
||
final UUID connectionId = webBackendConnectionPatch.getConnectionId(); | ||
|
@@ -535,13 +533,8 @@ private ConnectionRead resetStreamsIfNeeded(final WebBackendConnectionUpdate web | |
eventRunner.resetConnection( | ||
connectionId, | ||
streamsToReset, true); | ||
|
||
// return updated connectionRead after reset | ||
return connectionsHandler.getConnection(connectionId); | ||
} | ||
} | ||
// if no reset was necessary, return the connectionRead without changes | ||
return updatedConnectionRead; | ||
} | ||
|
||
private List<UUID> createOperations(final WebBackendConnectionCreate webBackendConnectionCreate) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for the scope of this review but to keep in mind for later: I think
discoverSchemaForSourceFromSourceId
should be move to the SourceHandler. We should get rid of the scheduler handler at some points (we don't have any scheduler anymore).