From dc61e078b3ba4e90d88d05e3a7b6f459c8fb7be7 Mon Sep 17 00:00:00 2001 From: alovew Date: Thu, 27 Oct 2022 11:28:58 -0700 Subject: [PATCH 01/11] update discover schema endpoint to calculate diff --- airbyte-api/src/main/openapi/config.yaml | 7 ++ .../airbyte/server/apis/ConfigurationApi.java | 13 +-- .../server/handlers/SchedulerHandler.java | 48 +++++++++- .../WebBackendConnectionsHandler.java | 18 ++-- .../server/handlers/SchedulerHandlerTest.java | 5 +- .../WebBackendConnectionsHandlerTest.java | 9 +- .../api/generated-api-html/index.html | 93 +++++++++++++++++++ .../examples/airbyte.local/openapi.yaml | 85 +++++++++++++++++ 8 files changed, 254 insertions(+), 24 deletions(-) diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index 69adc4af4cb6a..aa2a885c86b61 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -2822,6 +2822,9 @@ components: properties: sourceId: $ref: "#/components/schemas/SourceId" + connectionId: + type: string + format: uuid disable_cache: type: boolean SourceUpdate: @@ -2883,6 +2886,8 @@ components: catalogId: type: string format: uuid + catalogDiff: + $ref: "#/components/schemas/CatalogDiff" SourceSearch: type: object properties: @@ -3375,6 +3380,8 @@ components: type: boolean nonBreakingChangesPreference: $ref: "#/components/schemas/NonBreakingChangesPreference" + breakingChange: + type: boolean WebBackendConnectionUpdate: type: object description: Used to apply a patch-style update to a connection, which means that null properties remain unchanged diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java index 6bc240c92a41e..e4e9f455c7e99 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java @@ -184,6 +184,12 @@ public ConfigurationApi(final ConfigRepository configRepository, final WorkspaceHelper workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence); + connectionsHandler = new ConnectionsHandler( + configRepository, + workspaceHelper, + trackingClient, + eventRunner); + schedulerHandler = new SchedulerHandler( configRepository, secretsRepositoryReader, @@ -192,14 +198,9 @@ public ConfigurationApi(final ConfigRepository configRepository, jobPersistence, workerEnvironment, logConfigs, - eventRunner); + eventRunner, connectionsHandler); stateHandler = new StateHandler(statePersistence); - connectionsHandler = new ConnectionsHandler( - configRepository, - workspaceHelper, - trackingClient, - eventRunner); sourceHandler = new SourceHandler( configRepository, secretsRepositoryReader, diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 8ec0e99fedd33..d8bf887b993c8 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -11,9 +11,11 @@ import com.google.common.hash.Hashing; import io.airbyte.api.model.generated.AdvancedAuth; import io.airbyte.api.model.generated.AuthSpecification; +import io.airbyte.api.model.generated.CatalogDiff; import io.airbyte.api.model.generated.CheckConnectionRead; import io.airbyte.api.model.generated.CheckConnectionRead.StatusEnum; import io.airbyte.api.model.generated.ConnectionIdRequestBody; +import io.airbyte.api.model.generated.ConnectionUpdate; import io.airbyte.api.model.generated.DestinationCoreConfig; import io.airbyte.api.model.generated.DestinationDefinitionIdWithWorkspaceId; import io.airbyte.api.model.generated.DestinationDefinitionSpecificationRead; @@ -31,6 +33,7 @@ import io.airbyte.api.model.generated.SourceDiscoverSchemaRequestBody; import io.airbyte.api.model.generated.SourceIdRequestBody; import io.airbyte.api.model.generated.SourceUpdate; +import io.airbyte.api.model.generated.StreamTransform.TransformTypeEnum; import io.airbyte.api.model.generated.SynchronousJobRead; import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.enums.Enums; @@ -70,6 +73,8 @@ import java.util.ArrayList; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.validation.constraints.NotNull; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -80,6 +85,7 @@ public class SchedulerHandler { private static final ImmutableSet VALUE_CONFLICT_EXCEPTION_ERROR_CODE_SET = ImmutableSet.of(ErrorCode.WORKFLOW_DELETED, ErrorCode.WORKFLOW_RUNNING); + private final ConnectionsHandler connectionsHandler; private final ConfigRepository configRepository; private final SecretsRepositoryWriter secretsRepositoryWriter; private final SynchronousSchedulerClient synchronousSchedulerClient; @@ -96,7 +102,8 @@ public SchedulerHandler(final ConfigRepository configRepository, final JobPersistence jobPersistence, final WorkerEnvironment workerEnvironment, final LogConfigs logConfigs, - final EventRunner eventRunner) { + final EventRunner eventRunner, + final ConnectionsHandler connectionsHandler) { this( configRepository, secretsRepositoryWriter, @@ -105,7 +112,8 @@ public SchedulerHandler(final ConfigRepository configRepository, new JsonSchemaValidator(), jobPersistence, eventRunner, - new JobConverter(workerEnvironment, logConfigs)); + new JobConverter(workerEnvironment, logConfigs), + connectionsHandler); } @VisibleForTesting @@ -116,7 +124,8 @@ public SchedulerHandler(final ConfigRepository configRepository, final JsonSchemaValidator jsonSchemaValidator, final JobPersistence jobPersistence, final EventRunner eventRunner, - final JobConverter jobConverter) { + final JobConverter jobConverter, + final ConnectionsHandler connectionsHandler) { this.configRepository = configRepository; this.secretsRepositoryWriter = secretsRepositoryWriter; this.synchronousSchedulerClient = synchronousSchedulerClient; @@ -125,6 +134,7 @@ public SchedulerHandler(final ConfigRepository configRepository, this.jobPersistence = jobPersistence; this.eventRunner = eventRunner; this.jobConverter = jobConverter; + this.connectionsHandler = connectionsHandler; } public CheckConnectionRead checkSourceConnectionFromSourceId(final SourceIdRequestBody sourceIdRequestBody) @@ -230,7 +240,22 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source if (currentCatalog.isEmpty() || bustActorCatalogCache) { final SynchronousResponse persistedCatalogId = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName, connectorVersion, new Version(sourceDef.getProtocolVersion())); - return retrieveDiscoveredSchema(persistedCatalogId); + final SourceDiscoverSchemaRead discoveredSchema = retrieveDiscoveredSchema(persistedCatalogId); + + if (discoverSchemaRequestBody.getConnectionId() != null) { + final Optional catalogUsedToMakeConfiguredCatalog = connectionsHandler + .getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId()); + io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog = + connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId()).getSyncCatalog(); + CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(), + CatalogConverter.toProtocol(currentAirbyteCatalog)); + if (containsBreakingChange(diff)) { + connectionsHandler.updateConnection(new ConnectionUpdate().breakingChange(true)); + } + discoveredSchema.catalogDiff(diff); + } + + return discoveredSchema; } final AirbyteCatalog airbyteCatalog = Jsons.object(currentCatalog.get().getCatalog(), AirbyteCatalog.class); final SynchronousJobRead emptyJob = new SynchronousJobRead() @@ -409,4 +434,19 @@ private JobInfoRead readJobFromResult(final ManualOperationResult manualOperatio return jobConverter.getJobInfoRead(job); } + private boolean containsBreakingChange(final CatalogDiff diff) { + AtomicBoolean isBreaking = new AtomicBoolean(false); + diff.getTransforms().stream().forEach(streamTransform -> { + if (streamTransform.getTransformType() != TransformTypeEnum.UPDATE_STREAM) { + return; + } + streamTransform.getUpdateStream().stream().forEach(fieldTransform -> { + if (fieldTransform.getBreaking()) { + isBreaking.set(true); + } + }); + }); + return isBreaking.get(); + } + } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index 612a1bbae5eba..84d02bcab8443 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -328,7 +328,7 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti */ final Optional refreshedCatalog; if (MoreBooleans.isTruthy(webBackendConnectionRequestBody.getWithRefreshedCatalog())) { - refreshedCatalog = getRefreshedSchema(connection.getSourceId()); + refreshedCatalog = getRefreshedSchema(connection.getSourceId(), connection.getConnectionId()); } else { refreshedCatalog = Optional.empty(); } @@ -337,6 +337,7 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti final AirbyteCatalog syncCatalog; final Optional currentSourceCatalogId = Optional.ofNullable(connection.getSourceCatalogId()); if (refreshedCatalog.isPresent()) { + log.info("refreshsed catalog is present"); connection.sourceCatalogId(refreshedCatalog.get().getCatalogId()); /* * constructs a full picture of all existing configured + all new / updated streams in the newest @@ -351,8 +352,7 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti * but was present at time of configuration will appear in the diff as an added stream which is * confusing. We need to figure out why source_catalog_id is not always populated in the db. */ - diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(configuredCatalog), refreshedCatalog.get().getCatalog(), - CatalogConverter.toProtocol(configuredCatalog)); + diff = refreshedCatalog.get().getCatalogDiff(); } else if (catalogUsedToMakeConfiguredCatalog.isPresent()) { // reconstructs a full picture of the full schema at the time the catalog was configured. @@ -370,12 +370,15 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti return buildWebBackendConnectionRead(connection, currentSourceCatalogId).catalogDiff(diff); } - private Optional getRefreshedSchema(final UUID sourceId) + private Optional getRefreshedSchema(final UUID sourceId, final UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException { final SourceDiscoverSchemaRequestBody discoverSchemaReadReq = new SourceDiscoverSchemaRequestBody() .sourceId(sourceId) - .disableCache(true); - return Optional.ofNullable(schedulerHandler.discoverSchemaForSourceFromSourceId(discoverSchemaReadReq)); + .disableCache(true) + .connectionId(connectionId); + SourceDiscoverSchemaRead schemaRead = schedulerHandler.discoverSchemaForSourceFromSourceId(discoverSchemaReadReq); + log.info("schema read returned from discover schema is: " + schemaRead); + return Optional.ofNullable(schemaRead); } /** @@ -480,10 +483,7 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne connectionRead = connectionsHandler.updateConnection(connectionPatch); // detect if any streams need to be reset based on the patch and initial catalog, if so, reset them - // and fetch - // an up-to-date connectionRead connectionRead = resetStreamsIfNeeded(webBackendConnectionPatch, oldConfiguredCatalog, connectionRead); - /* * This catalog represents the full catalog that was used to create the configured catalog. It will * have all streams that were present at the time. It will have no configuration set. diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index a8429a52e44fd..1083eae6a85dd 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -134,6 +134,7 @@ class SchedulerHandlerTest { private JobPersistence jobPersistence; private EventRunner eventRunner; private JobConverter jobConverter; + private ConnectionsHandler connectionsHandler; @BeforeEach void setup() { @@ -150,6 +151,7 @@ void setup() { secretsRepositoryWriter = mock(SecretsRepositoryWriter.class); jobPersistence = mock(JobPersistence.class); eventRunner = mock(EventRunner.class); + connectionsHandler = mock(ConnectionsHandler.class); jobConverter = spy(new JobConverter(WorkerEnvironment.DOCKER, LogConfigs.EMPTY)); @@ -161,7 +163,8 @@ void setup() { jsonSchemaValidator, jobPersistence, eventRunner, - jobConverter); + jobConverter, + connectionsHandler); } @Test diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index a311d987e9541..a391999836823 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -403,15 +403,16 @@ WebBackendConnectionRead testWebBackendGetConnection(final boolean withCatalogRe @Test void testWebBackendGetConnectionWithDiscoveryAndNewSchema() throws ConfigNotFoundException, IOException, JsonValidationException { - when(connectionsHandler.getDiff(any(), any(), - any())).thenReturn(expectedWithNewSchema.getCatalogDiff()); final UUID newCatalogId = UUID.randomUUID(); when(configRepository.getMostRecentActorCatalogFetchEventForSource(any())) .thenReturn(Optional.of(new ActorCatalogFetchEvent().withActorCatalogId(newCatalogId))); when(configRepository.getActorCatalogById(any())).thenReturn(new ActorCatalog().withId(UUID.randomUUID())); + SourceDiscoverSchemaRead schemaRead = + new SourceDiscoverSchemaRead().catalogDiff(expectedWithNewSchema.getCatalogDiff()).catalog(expectedWithNewSchema.getSyncCatalog()); + when(schedulerHandler.discoverSchemaForSourceFromSourceId(any())).thenReturn(schemaRead); + final WebBackendConnectionRead result = testWebBackendGetConnection(true, connectionRead, operationReadList); - verify(schedulerHandler).discoverSchemaForSourceFromSourceId(any()); assertEquals(expectedWithNewSchema, result); } @@ -560,7 +561,7 @@ void testForConnectionPatchCompleteness() { final Set handledMethods = Set.of("schedule", "connectionId", "syncCatalog", "namespaceDefinition", "namespaceFormat", "prefix", "status", "operationIds", "addOperationIdsItem", "removeOperationIdsItem", "resourceRequirements", "name", - "sourceCatalogId", "scheduleType", "scheduleData", "geography", "notifySchemaChanges", "nonBreakingChangesPreference"); + "sourceCatalogId", "scheduleType", "scheduleData", "geography", "breakingChange", "notifySchemaChanges", "nonBreakingChangesPreference"); final Set methods = Arrays.stream(ConnectionUpdate.class.getMethods()) .filter(method -> method.getReturnType() == ConnectionUpdate.class) diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index 92bd602330f3d..b78e5a680eb71 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -6133,6 +6133,51 @@

Example data

"logLines" : [ "logLines", "logLines" ] }, "succeeded" : true + }, + "catalogDiff" : { + "transforms" : [ { + "streamDescriptor" : { + "name" : "name", + "namespace" : "namespace" + }, + "transformType" : "add_stream", + "updateStream" : [ { + "updateFieldSchema" : { }, + "fieldName" : [ "fieldName", "fieldName" ], + "addField" : { }, + "transformType" : "add_field", + "removeField" : { }, + "breaking" : true + }, { + "updateFieldSchema" : { }, + "fieldName" : [ "fieldName", "fieldName" ], + "addField" : { }, + "transformType" : "add_field", + "removeField" : { }, + "breaking" : true + } ] + }, { + "streamDescriptor" : { + "name" : "name", + "namespace" : "namespace" + }, + "transformType" : "add_stream", + "updateStream" : [ { + "updateFieldSchema" : { }, + "fieldName" : [ "fieldName", "fieldName" ], + "addField" : { }, + "transformType" : "add_field", + "removeField" : { }, + "breaking" : true + }, { + "updateFieldSchema" : { }, + "fieldName" : [ "fieldName", "fieldName" ], + "addField" : { }, + "transformType" : "add_field", + "removeField" : { }, + "breaking" : true + } ] + } ] } } @@ -6547,6 +6592,51 @@

Example data

"logLines" : [ "logLines", "logLines" ] }, "succeeded" : true + }, + "catalogDiff" : { + "transforms" : [ { + "streamDescriptor" : { + "name" : "name", + "namespace" : "namespace" + }, + "transformType" : "add_stream", + "updateStream" : [ { + "updateFieldSchema" : { }, + "fieldName" : [ "fieldName", "fieldName" ], + "addField" : { }, + "transformType" : "add_field", + "removeField" : { }, + "breaking" : true + }, { + "updateFieldSchema" : { }, + "fieldName" : [ "fieldName", "fieldName" ], + "addField" : { }, + "transformType" : "add_field", + "removeField" : { }, + "breaking" : true + } ] + }, { + "streamDescriptor" : { + "name" : "name", + "namespace" : "namespace" + }, + "transformType" : "add_stream", + "updateStream" : [ { + "updateFieldSchema" : { }, + "fieldName" : [ "fieldName", "fieldName" ], + "addField" : { }, + "transformType" : "add_field", + "removeField" : { }, + "breaking" : true + }, { + "updateFieldSchema" : { }, + "fieldName" : [ "fieldName", "fieldName" ], + "addField" : { }, + "transformType" : "add_field", + "removeField" : { }, + "breaking" : true + } ] + } ] } } @@ -10538,6 +10628,7 @@

ConnectionUpdate - geography (optional)
notifySchemaChanges (optional)
nonBreakingChangesPreference (optional)
+
breakingChange (optional)
@@ -11421,6 +11512,7 @@

SourceDiscoverSchemaRead - <
catalog (optional)
jobInfo
catalogId (optional)
UUID format: uuid
+
catalogDiff (optional)

@@ -11428,6 +11520,7 @@

SourceDiscoverSchemaRequestB
sourceId
UUID format: uuid
+
connectionId (optional)
UUID format: uuid
disable_cache (optional)

diff --git a/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml b/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml index 99d10f2ad9c9a..d22f71a9f70fb 100644 --- a/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml +++ b/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml @@ -2489,6 +2489,8 @@ components: $ref: "#/components/schemas/AirbyteCatalog" jobInfo: $ref: "#/components/schemas/SynchronousJobRead" + catalogDiff: + $ref: "#/components/schemas/CatalogDiff" required: - jobInfo type: object @@ -2891,8 +2893,91 @@ components: - news - securityUpdates type: object + CatalogDiff: + type: object + description: Describes the difference between two Airbyte catalogs. + required: + - transforms + properties: + transforms: + description: list of stream transformations. order does not matter. + type: array + items: + $ref: "#/components/schemas/StreamTransform" + StreamTransform: + type: object + required: + - transformType + - streamDescriptor + properties: + transformType: + type: string + enum: + - add_stream + - remove_stream + - update_stream + streamDescriptor: + $ref: "#/components/schemas/StreamDescriptor" + updateStream: + type: array + description: list of field transformations. order does not matter. + items: + $ref: "#/components/schemas/FieldTransform" + FieldTransform: + type: object + description: "Describes the difference between two Streams." + required: + - transformType + - fieldName + - breaking + properties: + transformType: + type: string + enum: + - add_field + - remove_field + - update_field_schema + fieldName: + $ref: "#/components/schemas/FieldName" + breaking: + type: boolean + addField: + $ref: "#/components/schemas/FieldAdd" + removeField: + $ref: "#/components/schemas/FieldRemove" + updateFieldSchema: + $ref: "#/components/schemas/FieldSchemaUpdate" + FieldAdd: + type: object + properties: + schema: + $ref: "#/components/schemas/FieldSchema" + FieldRemove: + type: object + properties: + schema: + $ref: "#/components/schemas/FieldSchema" + FieldSchemaUpdate: + type: object + required: + - oldSchema + - newSchema + properties: + oldSchema: + $ref: "#/components/schemas/FieldSchema" + newSchema: + $ref: "#/components/schemas/FieldSchema" + FieldName: + description: A field name is a list of strings that form the path to the field. + type: array + items: + type: string + FieldSchema: + description: JSONSchema representation of the field + type: object securitySchemes: bearerAuth: bearerFormat: JWT scheme: bearer type: http + From 80b576718d78cea0894d06fb6090af8a4d1ab3e4 Mon Sep 17 00:00:00 2001 From: alovew Date: Thu, 27 Oct 2022 12:23:27 -0700 Subject: [PATCH 02/11] remove logs --- .../handlers/WebBackendConnectionsHandler.java | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index 84d02bcab8443..037186bee3d21 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -337,7 +337,6 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti final AirbyteCatalog syncCatalog; final Optional currentSourceCatalogId = Optional.ofNullable(connection.getSourceCatalogId()); if (refreshedCatalog.isPresent()) { - log.info("refreshsed catalog is present"); connection.sourceCatalogId(refreshedCatalog.get().getCatalogId()); /* * constructs a full picture of all existing configured + all new / updated streams in the newest @@ -377,7 +376,6 @@ private Optional getRefreshedSchema(final UUID sourceI .disableCache(true) .connectionId(connectionId); SourceDiscoverSchemaRead schemaRead = schedulerHandler.discoverSchemaForSourceFromSourceId(discoverSchemaReadReq); - log.info("schema read returned from discover schema is: " + schemaRead); return Optional.ofNullable(schemaRead); } @@ -483,7 +481,7 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne connectionRead = connectionsHandler.updateConnection(connectionPatch); // detect if any streams need to be reset based on the patch and initial catalog, if so, reset them - connectionRead = resetStreamsIfNeeded(webBackendConnectionPatch, oldConfiguredCatalog, connectionRead); + resetStreamsIfNeeded(webBackendConnectionPatch, oldConfiguredCatalog, connectionRead); /* * This catalog represents the full catalog that was used to create the configured catalog. It will * have all streams that were present at the time. It will have no configuration set. @@ -504,9 +502,9 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne * Given a fully updated connection, check for a diff between the old catalog and the updated * catalog to see if any streams need to be reset. */ - private ConnectionRead resetStreamsIfNeeded(final WebBackendConnectionUpdate webBackendConnectionPatch, - final ConfiguredAirbyteCatalog oldConfiguredCatalog, - final ConnectionRead updatedConnectionRead) + private void resetStreamsIfNeeded(final WebBackendConnectionUpdate webBackendConnectionPatch, + final ConfiguredAirbyteCatalog oldConfiguredCatalog, + final ConnectionRead updatedConnectionRead) throws IOException, JsonValidationException, ConfigNotFoundException { final UUID connectionId = webBackendConnectionPatch.getConnectionId(); @@ -535,13 +533,8 @@ private ConnectionRead resetStreamsIfNeeded(final WebBackendConnectionUpdate web eventRunner.resetConnection( connectionId, streamsToReset, true); - - // return updated connectionRead after reset - return connectionsHandler.getConnection(connectionId); } } - // if no reset was necessary, return the connectionRead without changes - return updatedConnectionRead; } private List createOperations(final WebBackendConnectionCreate webBackendConnectionCreate) From 74353b8d753f0d7895bec41d1bffc2567bfce57f Mon Sep 17 00:00:00 2001 From: alovew Date: Thu, 27 Oct 2022 12:29:27 -0700 Subject: [PATCH 03/11] add connection id --- .../main/java/io/airbyte/server/handlers/SchedulerHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index d8bf887b993c8..5b3881bf77442 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -250,7 +250,7 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(), CatalogConverter.toProtocol(currentAirbyteCatalog)); if (containsBreakingChange(diff)) { - connectionsHandler.updateConnection(new ConnectionUpdate().breakingChange(true)); + connectionsHandler.updateConnection(new ConnectionUpdate().breakingChange(true).connectionId(discoverSchemaRequestBody.getConnectionId())); } discoveredSchema.catalogDiff(diff); } From 0855b5f62fb84d1a9b2cc51c27bd12eed990ca7a Mon Sep 17 00:00:00 2001 From: alovew Date: Thu, 27 Oct 2022 16:05:51 -0700 Subject: [PATCH 04/11] tests --- .../server/handlers/SchedulerHandlerTest.java | 105 ++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index 1083eae6a85dd..fd5344eb24133 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -20,13 +20,18 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.collect.Lists; +import io.airbyte.api.model.generated.CatalogDiff; import io.airbyte.api.model.generated.CheckConnectionRead; import io.airbyte.api.model.generated.ConnectionIdRequestBody; +import io.airbyte.api.model.generated.ConnectionRead; +import io.airbyte.api.model.generated.ConnectionUpdate; import io.airbyte.api.model.generated.DestinationCoreConfig; import io.airbyte.api.model.generated.DestinationDefinitionIdWithWorkspaceId; import io.airbyte.api.model.generated.DestinationDefinitionSpecificationRead; import io.airbyte.api.model.generated.DestinationIdRequestBody; import io.airbyte.api.model.generated.DestinationUpdate; +import io.airbyte.api.model.generated.FieldTransform; import io.airbyte.api.model.generated.JobIdRequestBody; import io.airbyte.api.model.generated.JobInfoRead; import io.airbyte.api.model.generated.SourceCoreConfig; @@ -36,6 +41,8 @@ import io.airbyte.api.model.generated.SourceDiscoverSchemaRequestBody; import io.airbyte.api.model.generated.SourceIdRequestBody; import io.airbyte.api.model.generated.SourceUpdate; +import io.airbyte.api.model.generated.StreamTransform; +import io.airbyte.api.model.generated.StreamTransform.TransformTypeEnum; import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.enums.Enums; import io.airbyte.commons.json.Jsons; @@ -67,6 +74,7 @@ import io.airbyte.server.converters.ConfigurationUpdate; import io.airbyte.server.converters.JobConverter; import io.airbyte.server.errors.ValueConflictKnownException; +import io.airbyte.server.handlers.helpers.CatalogConverter; import io.airbyte.server.helpers.DestinationHelpers; import io.airbyte.server.helpers.SourceHelpers; import io.airbyte.server.scheduler.EventRunner; @@ -520,6 +528,103 @@ void testDiscoverSchemaForSourceFromSourceIdFailed() throws IOException, JsonVal verify(synchronousSchedulerClient).createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION)); } + @Test + void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreaking() throws IOException, JsonValidationException, ConfigNotFoundException { + final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); + final UUID connectionId = UUID.randomUUID(); + final UUID discoveredCatalogId = UUID.randomUUID(); + final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; + final SourceDiscoverSchemaRequestBody request = + new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true); + final StreamTransform streamTransform = new StreamTransform().transformType(TransformTypeEnum.REMOVE_STREAM) + .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name("dogs")); + final CatalogDiff catalogDiff = new CatalogDiff().addTransformsItem(streamTransform); + when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId())) + .thenReturn(new StandardSourceDefinition() + .withDockerRepository(SOURCE_DOCKER_REPO) + .withDockerImageTag(SOURCE_DOCKER_TAG) + .withProtocolVersion(SOURCE_PROTOCOL_VERSION) + .withSourceDefinitionId(source.getSourceDefinitionId())); + when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source); + when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION))) + .thenReturn(discoverResponse); + + when(discoverResponse.isSuccess()).thenReturn(true); + when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); + + final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( + CatalogHelpers.createAirbyteStream("shoes", Field.of("sku", JsonSchemaType.STRING)), + CatalogHelpers.createAirbyteStream("dogs", Field.of("name", JsonSchemaType.STRING)))); + + final ConnectionRead connectionRead = new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent)); + when(connectionsHandler.getConnection(request.getConnectionId())).thenReturn(connectionRead); + when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff); + + final ActorCatalog actorCatalog = new ActorCatalog() + .withCatalog(Jsons.jsonNode(airbyteCatalog)) + .withCatalogHash("") + .withId(discoveredCatalogId); + when(configRepository.getActorCatalogById(discoveredCatalogId)).thenReturn(actorCatalog); + + final AirbyteCatalog persistenceCatalog = Jsons.object(actorCatalog.getCatalog(), + io.airbyte.protocol.models.AirbyteCatalog.class); + final io.airbyte.api.model.generated.AirbyteCatalog expectedActorCatalog = CatalogConverter.toApi(persistenceCatalog); + + final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request); + assertEquals(actual.getCatalogDiff(), catalogDiff); + assertEquals(actual.getCatalog(), expectedActorCatalog); + } + + @Test + void testDiscoverSchemaFromSourceIdWithConnectionIdBreaking() throws IOException, JsonValidationException, ConfigNotFoundException { + final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); + final UUID connectionId = UUID.randomUUID(); + final UUID discoveredCatalogId = UUID.randomUUID(); + final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; + final SourceDiscoverSchemaRequestBody request = + new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true); + final StreamTransform streamTransform = new StreamTransform().transformType(TransformTypeEnum.UPDATE_STREAM) + .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name("dogs")).addUpdateStreamItem(new FieldTransform().transformType( + FieldTransform.TransformTypeEnum.REMOVE_FIELD).breaking(true)); + final CatalogDiff catalogDiff = new CatalogDiff().addTransformsItem(streamTransform); + when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId())) + .thenReturn(new StandardSourceDefinition() + .withDockerRepository(SOURCE_DOCKER_REPO) + .withDockerImageTag(SOURCE_DOCKER_TAG) + .withProtocolVersion(SOURCE_PROTOCOL_VERSION) + .withSourceDefinitionId(source.getSourceDefinitionId())); + when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source); + when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION))) + .thenReturn(discoverResponse); + + when(discoverResponse.isSuccess()).thenReturn(true); + when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); + + final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( + CatalogHelpers.createAirbyteStream("shoes", Field.of("sku", JsonSchemaType.STRING)), + CatalogHelpers.createAirbyteStream("dogs", Field.of("name", JsonSchemaType.STRING)))); + + final ConnectionRead connectionRead = new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent)); + when(connectionsHandler.getConnection(request.getConnectionId())).thenReturn(connectionRead); + when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff); + + final ActorCatalog actorCatalog = new ActorCatalog() + .withCatalog(Jsons.jsonNode(airbyteCatalog)) + .withCatalogHash("") + .withId(discoveredCatalogId); + when(configRepository.getActorCatalogById(discoveredCatalogId)).thenReturn(actorCatalog); + + final AirbyteCatalog persistenceCatalog = Jsons.object(actorCatalog.getCatalog(), + io.airbyte.protocol.models.AirbyteCatalog.class); + final io.airbyte.api.model.generated.AirbyteCatalog expectedActorCatalog = CatalogConverter.toApi(persistenceCatalog); + final ConnectionUpdate expectedConnectionUpdate = new ConnectionUpdate().connectionId(connectionId).breakingChange(true); + + final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request); + assertEquals(actual.getCatalogDiff(), catalogDiff); + assertEquals(actual.getCatalog(), expectedActorCatalog); + verify(connectionsHandler).updateConnection(expectedConnectionUpdate); + } + @Test void testDiscoverSchemaForSourceFromSourceCreate() throws JsonValidationException, IOException, ConfigNotFoundException { final SourceConnection source = new SourceConnection() From 73d8d01974fdb3acaba1492b10cdbd8ddda4e1c1 Mon Sep 17 00:00:00 2001 From: alovew Date: Thu, 27 Oct 2022 16:27:29 -0700 Subject: [PATCH 05/11] one liner for checking field transforms --- .../java/io/airbyte/server/apis/ConfigurationApi.java | 3 ++- .../java/io/airbyte/server/handlers/SchedulerHandler.java | 8 +++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java index e4e9f455c7e99..947ea4f9591cd 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java @@ -198,7 +198,8 @@ public ConfigurationApi(final ConfigRepository configRepository, jobPersistence, workerEnvironment, logConfigs, - eventRunner, connectionsHandler); + eventRunner, + connectionsHandler); stateHandler = new StateHandler(statePersistence); sourceHandler = new SourceHandler( diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 5b3881bf77442..8a4c8a10b1ba0 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -440,11 +440,9 @@ private boolean containsBreakingChange(final CatalogDiff diff) { if (streamTransform.getTransformType() != TransformTypeEnum.UPDATE_STREAM) { return; } - streamTransform.getUpdateStream().stream().forEach(fieldTransform -> { - if (fieldTransform.getBreaking()) { - isBreaking.set(true); - } - }); + + boolean anyBreakingFieldTransforms = streamTransform.getUpdateStream().stream().anyMatch(fieldTransform -> fieldTransform.getBreaking()); + isBreaking.set(anyBreakingFieldTransforms); }); return isBreaking.get(); } From f47d0ae976149dfb9e8aef48d6e9531bb8fb188c Mon Sep 17 00:00:00 2001 From: alovew Date: Thu, 27 Oct 2022 16:47:49 -0700 Subject: [PATCH 06/11] pmd --- .../server/handlers/SchedulerHandlerTest.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index fd5344eb24133..2eb3e9be56d7c 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -104,6 +104,8 @@ class SchedulerHandlerTest { private static final String DESTINATION_DOCKER_TAG = "tag"; private static final String DESTINATION_DOCKER_IMAGE = DockerUtils.getTaggedImageName(DESTINATION_DOCKER_REPO, DESTINATION_DOCKER_TAG); private static final String DESTINATION_PROTOCOL_VERSION = "0.7.9"; + private static final String NAME = "name"; + private static final String DOGS = "dogs"; private static final AirbyteCatalog airbyteCatalog = CatalogHelpers.createAirbyteCatalog("shoes", Field.of("sku", JsonSchemaType.STRING)); @@ -265,7 +267,7 @@ void testGetSourceSpec() throws JsonValidationException, IOException, ConfigNotF new SourceDefinitionIdWithWorkspaceId().sourceDefinitionId(UUID.randomUUID()).workspaceId(UUID.randomUUID()); final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition() - .withName("name") + .withName(NAME) .withDockerRepository(SOURCE_DOCKER_REPO) .withDockerImageTag(SOURCE_DOCKER_TAG) .withSourceDefinitionId(sourceDefinitionIdWithWorkspaceId.getSourceDefinitionId()) @@ -285,7 +287,7 @@ void testGetDestinationSpec() throws JsonValidationException, IOException, Confi new DestinationDefinitionIdWithWorkspaceId().destinationDefinitionId(UUID.randomUUID()).workspaceId(UUID.randomUUID()); final StandardDestinationDefinition destinationDefinition = new StandardDestinationDefinition() - .withName("name") + .withName(NAME) .withDockerRepository(DESTINATION_DOCKER_REPO) .withDockerImageTag(DESTINATION_DOCKER_TAG) .withDestinationDefinitionId(destinationDefinitionIdWithWorkspaceId.getDestinationDefinitionId()) @@ -537,7 +539,7 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreaking() throws IOExcept final SourceDiscoverSchemaRequestBody request = new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true); final StreamTransform streamTransform = new StreamTransform().transformType(TransformTypeEnum.REMOVE_STREAM) - .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name("dogs")); + .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name(DOGS)); final CatalogDiff catalogDiff = new CatalogDiff().addTransformsItem(streamTransform); when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId())) .thenReturn(new StandardSourceDefinition() @@ -554,7 +556,7 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreaking() throws IOExcept final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( CatalogHelpers.createAirbyteStream("shoes", Field.of("sku", JsonSchemaType.STRING)), - CatalogHelpers.createAirbyteStream("dogs", Field.of("name", JsonSchemaType.STRING)))); + CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); final ConnectionRead connectionRead = new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent)); when(connectionsHandler.getConnection(request.getConnectionId())).thenReturn(connectionRead); @@ -584,7 +586,7 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdBreaking() throws IOException final SourceDiscoverSchemaRequestBody request = new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true); final StreamTransform streamTransform = new StreamTransform().transformType(TransformTypeEnum.UPDATE_STREAM) - .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name("dogs")).addUpdateStreamItem(new FieldTransform().transformType( + .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name(DOGS)).addUpdateStreamItem(new FieldTransform().transformType( FieldTransform.TransformTypeEnum.REMOVE_FIELD).breaking(true)); final CatalogDiff catalogDiff = new CatalogDiff().addTransformsItem(streamTransform); when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId())) @@ -602,7 +604,7 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdBreaking() throws IOException final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( CatalogHelpers.createAirbyteStream("shoes", Field.of("sku", JsonSchemaType.STRING)), - CatalogHelpers.createAirbyteStream("dogs", Field.of("name", JsonSchemaType.STRING)))); + CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); final ConnectionRead connectionRead = new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent)); when(connectionsHandler.getConnection(request.getConnectionId())).thenReturn(connectionRead); From feb2b520d5d5dff045eb34069fa5f3888eb79873 Mon Sep 17 00:00:00 2001 From: alovew Date: Fri, 28 Oct 2022 13:15:39 -0700 Subject: [PATCH 07/11] add breakingChange to SourceDiscoverSchemaRead object --- airbyte-api/src/main/openapi/config.yaml | 2 ++ .../java/io/airbyte/server/ServerApp.java | 3 ++- .../server/handlers/ConnectionsHandler.java | 4 +++ .../server/handlers/SchedulerHandler.java | 27 ++++++++++++------- .../WebBackendConnectionsHandler.java | 2 +- .../WebBackendConnectionsHandlerTest.java | 3 ++- .../api/generated-api-html/index.html | 3 +++ .../examples/airbyte.local/openapi.yaml | 2 ++ 8 files changed, 33 insertions(+), 13 deletions(-) diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index aa2a885c86b61..18f7c136de42b 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -2888,6 +2888,8 @@ components: format: uuid catalogDiff: $ref: "#/components/schemas/CatalogDiff" + breakingChange: + type: boolean SourceSearch: type: object properties: diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 8ec8ac60b8248..8bdf4ceb44d3c 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -293,7 +293,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, jobPersistence, configs.getWorkerEnvironment(), configs.getLogConfigs(), - eventRunner); + eventRunner, + connectionsHandler); final DbMigrationHandler dbMigrationHandler = new DbMigrationHandler(configsDatabase, configsFlyway, jobsDatabase, jobsFlyway); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java index df0850e2c97dc..fe1df0a0dfd71 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java @@ -343,6 +343,10 @@ private static void applyPatchToStandardSync(final StandardSync sync, final Conn if (patch.getGeography() != null) { sync.setGeography(ApiPojoConverters.toPersistenceGeography(patch.getGeography())); } + + if (patch.getBreakingChange() != null) { + sync.setBreakingChange(patch.getBreakingChange()); + } } private void validateConnectionPatch(final WorkspaceHelper workspaceHelper, final StandardSync persistedSync, final ConnectionUpdate patch) { diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 8a4c8a10b1ba0..d70c74f2fa5e9 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -243,16 +243,7 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source final SourceDiscoverSchemaRead discoveredSchema = retrieveDiscoveredSchema(persistedCatalogId); if (discoverSchemaRequestBody.getConnectionId() != null) { - final Optional catalogUsedToMakeConfiguredCatalog = connectionsHandler - .getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId()); - io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog = - connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId()).getSyncCatalog(); - CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(), - CatalogConverter.toProtocol(currentAirbyteCatalog)); - if (containsBreakingChange(diff)) { - connectionsHandler.updateConnection(new ConnectionUpdate().breakingChange(true).connectionId(discoverSchemaRequestBody.getConnectionId())); - } - discoveredSchema.catalogDiff(diff); + discoveredSchemaWithCatalogDiff(discoveredSchema, discoverSchemaRequestBody); } return discoveredSchema; @@ -364,6 +355,22 @@ public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOE return submitCancellationToWorker(jobIdRequestBody.getId()); } + private void discoveredSchemaWithCatalogDiff(SourceDiscoverSchemaRead discoveredSchema, SourceDiscoverSchemaRequestBody discoverSchemaRequestBody) + throws JsonValidationException, ConfigNotFoundException, IOException { + final Optional catalogUsedToMakeConfiguredCatalog = connectionsHandler + .getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId()); + io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog = + connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId()).getSyncCatalog(); + CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(), + CatalogConverter.toProtocol(currentAirbyteCatalog)); + boolean containsBreakingChange = containsBreakingChange(diff); + ConnectionUpdate updateObject = + new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(discoverSchemaRequestBody.getConnectionId()); + connectionsHandler.updateConnection(updateObject); + discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange); + + } + private CheckConnectionRead reportConnectionStatus(final SynchronousResponse response) { final CheckConnectionRead checkConnectionRead = new CheckConnectionRead() .jobInfo(jobConverter.getSynchronousJobRead(response)); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index 037186bee3d21..9931d3665f939 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -352,7 +352,7 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti * confusing. We need to figure out why source_catalog_id is not always populated in the db. */ diff = refreshedCatalog.get().getCatalogDiff(); - + connection.setBreakingChange(refreshedCatalog.get().getBreakingChange()); } else if (catalogUsedToMakeConfiguredCatalog.isPresent()) { // reconstructs a full picture of the full schema at the time the catalog was configured. syncCatalog = updateSchemaWithDiscovery(configuredCatalog, catalogUsedToMakeConfiguredCatalog.get()); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index a391999836823..2ddde202d1777 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -408,7 +408,8 @@ void testWebBackendGetConnectionWithDiscoveryAndNewSchema() throws ConfigNotFoun .thenReturn(Optional.of(new ActorCatalogFetchEvent().withActorCatalogId(newCatalogId))); when(configRepository.getActorCatalogById(any())).thenReturn(new ActorCatalog().withId(UUID.randomUUID())); SourceDiscoverSchemaRead schemaRead = - new SourceDiscoverSchemaRead().catalogDiff(expectedWithNewSchema.getCatalogDiff()).catalog(expectedWithNewSchema.getSyncCatalog()); + new SourceDiscoverSchemaRead().catalogDiff(expectedWithNewSchema.getCatalogDiff()).catalog(expectedWithNewSchema.getSyncCatalog()) + .breakingChange(false); when(schedulerHandler.discoverSchemaForSourceFromSourceId(any())).thenReturn(schemaRead); final WebBackendConnectionRead result = testWebBackendGetConnection(true, connectionRead, diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index b78e5a680eb71..1d2e9687540ce 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -6091,6 +6091,7 @@

Example data

Content-Type: application/json
{
   "catalogId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91",
+  "breakingChange" : true,
   "catalog" : {
     "streams" : [ {
       "stream" : {
@@ -6550,6 +6551,7 @@ 

Example data

Content-Type: application/json
{
   "catalogId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91",
+  "breakingChange" : true,
   "catalog" : {
     "streams" : [ {
       "stream" : {
@@ -11513,6 +11515,7 @@ 

SourceDiscoverSchemaRead - <
jobInfo
catalogId (optional)
UUID format: uuid
catalogDiff (optional)
+
breakingChange (optional)
diff --git a/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml b/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml index d22f71a9f70fb..30c32da65c219 100644 --- a/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml +++ b/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml @@ -2491,6 +2491,8 @@ components: $ref: "#/components/schemas/SynchronousJobRead" catalogDiff: $ref: "#/components/schemas/CatalogDiff" + breakingChange: + type: boolean required: - jobInfo type: object From 2e2dddeaf4cde7916553ff61e889127a96258117 Mon Sep 17 00:00:00 2001 From: alovew Date: Fri, 28 Oct 2022 13:24:23 -0700 Subject: [PATCH 08/11] test for connection with breaking change after discovery --- .../WebBackendConnectionsHandlerTest.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index 2ddde202d1777..4df0961d64fa8 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -121,6 +121,7 @@ class WebBackendConnectionsHandlerTest { private OperationReadList brokenOperationReadList; private WebBackendConnectionRead expected; private WebBackendConnectionRead expectedWithNewSchema; + private WebBackendConnectionRead expectedWithNewSchemaAndBreakingChange; private WebBackendConnectionRead expectedWithNewSchemaBroken; private WebBackendConnectionRead expectedNoDiscoveryWithNewSchema; private EventRunner eventRunner; @@ -275,6 +276,13 @@ void setup() throws IOException, JsonValidationException, ConfigNotFoundExceptio .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name("users-data1")) .updateStream(null)))); + expectedWithNewSchemaAndBreakingChange = expectedWebBackendConnectionReadObject(connectionRead, sourceRead, destinationRead, + new OperationReadList().operations(expected.getOperations()), SchemaChange.BREAKING, now, modifiedCatalog, null) + .catalogDiff(new CatalogDiff().transforms(List.of( + new StreamTransform().transformType(TransformTypeEnum.ADD_STREAM) + .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name("users-data1")) + .updateStream(null)))); + expectedWithNewSchemaBroken = expectedWebBackendConnectionReadObject(brokenConnectionRead, sourceRead, destinationRead, brokenOperationReadList, SchemaChange.BREAKING, now, connectionRead.getSyncCatalog(), brokenConnectionRead.getSourceCatalogId()); when(schedulerHandler.resetConnection(any(ConnectionIdRequestBody.class))) @@ -417,6 +425,23 @@ void testWebBackendGetConnectionWithDiscoveryAndNewSchema() throws ConfigNotFoun assertEquals(expectedWithNewSchema, result); } + @Test + void testWebBackendGetConnectionWithDiscoveryAndNewSchemaBreakingChange() throws ConfigNotFoundException, + IOException, JsonValidationException { + final UUID newCatalogId = UUID.randomUUID(); + when(configRepository.getMostRecentActorCatalogFetchEventForSource(any())) + .thenReturn(Optional.of(new ActorCatalogFetchEvent().withActorCatalogId(newCatalogId))); + when(configRepository.getActorCatalogById(any())).thenReturn(new ActorCatalog().withId(UUID.randomUUID())); + SourceDiscoverSchemaRead schemaRead = + new SourceDiscoverSchemaRead().catalogDiff(expectedWithNewSchema.getCatalogDiff()).catalog(expectedWithNewSchema.getSyncCatalog()) + .breakingChange(true); + when(schedulerHandler.discoverSchemaForSourceFromSourceId(any())).thenReturn(schemaRead); + + final WebBackendConnectionRead result = testWebBackendGetConnection(true, connectionRead, + operationReadList); + assertEquals(expectedWithNewSchemaAndBreakingChange, result); + } + @Test void testWebBackendGetConnectionNoRefreshCatalog() throws JsonValidationException, ConfigNotFoundException, IOException { From badd99c873e24ba4391cbb299fab9f63585c80cf Mon Sep 17 00:00:00 2001 From: alovew Date: Fri, 28 Oct 2022 14:40:22 -0700 Subject: [PATCH 09/11] format --- .../server/handlers/WebBackendConnectionsHandlerTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index 4df0961d64fa8..c718a9c518c9a 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -278,10 +278,10 @@ void setup() throws IOException, JsonValidationException, ConfigNotFoundExceptio expectedWithNewSchemaAndBreakingChange = expectedWebBackendConnectionReadObject(connectionRead, sourceRead, destinationRead, new OperationReadList().operations(expected.getOperations()), SchemaChange.BREAKING, now, modifiedCatalog, null) - .catalogDiff(new CatalogDiff().transforms(List.of( - new StreamTransform().transformType(TransformTypeEnum.ADD_STREAM) - .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name("users-data1")) - .updateStream(null)))); + .catalogDiff(new CatalogDiff().transforms(List.of( + new StreamTransform().transformType(TransformTypeEnum.ADD_STREAM) + .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name("users-data1")) + .updateStream(null)))); expectedWithNewSchemaBroken = expectedWebBackendConnectionReadObject(brokenConnectionRead, sourceRead, destinationRead, brokenOperationReadList, SchemaChange.BREAKING, now, connectionRead.getSyncCatalog(), brokenConnectionRead.getSourceCatalogId()); From 697f20262c269d10ec7271e8f02d1a11b7d7ec67 Mon Sep 17 00:00:00 2001 From: alovew Date: Fri, 28 Oct 2022 16:11:31 -0700 Subject: [PATCH 10/11] final variable --- .../main/java/io/airbyte/server/handlers/SchedulerHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index d70c74f2fa5e9..b119be162999a 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -359,7 +359,7 @@ private void discoveredSchemaWithCatalogDiff(SourceDiscoverSchemaRead discovered throws JsonValidationException, ConfigNotFoundException, IOException { final Optional catalogUsedToMakeConfiguredCatalog = connectionsHandler .getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId()); - io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog = + final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog = connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId()).getSyncCatalog(); CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(), CatalogConverter.toProtocol(currentAirbyteCatalog)); From 9b47e6aec9745ca10a2b61fc80b86f4dde2113bb Mon Sep 17 00:00:00 2001 From: alovew Date: Tue, 1 Nov 2022 11:52:08 -0700 Subject: [PATCH 11/11] use for loop --- .../server/handlers/SchedulerHandler.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index b119be162999a..49a4a75940e91 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -22,6 +22,7 @@ import io.airbyte.api.model.generated.DestinationIdRequestBody; import io.airbyte.api.model.generated.DestinationSyncMode; import io.airbyte.api.model.generated.DestinationUpdate; +import io.airbyte.api.model.generated.FieldTransform; import io.airbyte.api.model.generated.JobConfigType; import io.airbyte.api.model.generated.JobIdRequestBody; import io.airbyte.api.model.generated.JobInfoRead; @@ -33,6 +34,7 @@ import io.airbyte.api.model.generated.SourceDiscoverSchemaRequestBody; import io.airbyte.api.model.generated.SourceIdRequestBody; import io.airbyte.api.model.generated.SourceUpdate; +import io.airbyte.api.model.generated.StreamTransform; import io.airbyte.api.model.generated.StreamTransform.TransformTypeEnum; import io.airbyte.api.model.generated.SynchronousJobRead; import io.airbyte.commons.docker.DockerUtils; @@ -73,7 +75,6 @@ import java.util.ArrayList; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; import javax.validation.constraints.NotNull; import lombok.extern.slf4j.Slf4j; @@ -442,16 +443,18 @@ private JobInfoRead readJobFromResult(final ManualOperationResult manualOperatio } private boolean containsBreakingChange(final CatalogDiff diff) { - AtomicBoolean isBreaking = new AtomicBoolean(false); - diff.getTransforms().stream().forEach(streamTransform -> { + for (StreamTransform streamTransform : diff.getTransforms()) { if (streamTransform.getTransformType() != TransformTypeEnum.UPDATE_STREAM) { - return; + continue; } - boolean anyBreakingFieldTransforms = streamTransform.getUpdateStream().stream().anyMatch(fieldTransform -> fieldTransform.getBreaking()); - isBreaking.set(anyBreakingFieldTransforms); - }); - return isBreaking.get(); + boolean anyBreakingFieldTransforms = streamTransform.getUpdateStream().stream().anyMatch(FieldTransform::getBreaking); + if (anyBreakingFieldTransforms) { + return true; + } + } + + return false; } }