From 54eba7e46c73efa64d5104543b7e1966d80fcd7b Mon Sep 17 00:00:00 2001 From: Michael Siega Date: Thu, 29 Sep 2022 16:38:47 +0200 Subject: [PATCH 1/5] support large schema discovery --- .../resources/types/ConnectorJobOutput.yaml | 6 +- .../types/JobDiscoverCatalogConfig.yaml | 12 +++ .../types/StandardDiscoverCatalogInput.yaml | 12 +++ .../server/handlers/SchedulerHandler.java | 19 ++--- .../DefaultSynchronousSchedulerClient.java | 17 ++++- .../scheduler/SynchronousSchedulerClient.java | 3 +- .../server/handlers/SchedulerHandlerTest.java | 74 ++++++++++--------- ...DefaultSynchronousSchedulerClientTest.java | 15 ++-- .../java/io/airbyte/workers/WorkerUtils.java | 2 +- .../general/DefaultDiscoverCatalogWorker.java | 29 +++++--- .../workers/temporal/TemporalClient.java | 3 +- .../catalog/DiscoverCatalogActivityImpl.java | 12 ++- .../DefaultDiscoverCatalogWorkerTest.java | 43 ++++++++--- tools/bin/make-big-schema.sh | 10 +++ 14 files changed, 169 insertions(+), 88 deletions(-) create mode 100755 tools/bin/make-big-schema.sh diff --git a/airbyte-config/config-models/src/main/resources/types/ConnectorJobOutput.yaml b/airbyte-config/config-models/src/main/resources/types/ConnectorJobOutput.yaml index 7611cae277c09..48376f03848ad 100644 --- a/airbyte-config/config-models/src/main/resources/types/ConnectorJobOutput.yaml +++ b/airbyte-config/config-models/src/main/resources/types/ConnectorJobOutput.yaml @@ -12,12 +12,12 @@ properties: type: string enum: - checkConnection - - discoverCatalog + - discoverCatalogId - spec checkConnection: "$ref": StandardCheckConnectionOutput.yaml - discoverCatalog: - existingJavaType: io.airbyte.protocol.models.AirbyteCatalog + discoverCatalogId: + type: string spec: existingJavaType: io.airbyte.protocol.models.ConnectorSpecification failureReason: diff --git a/airbyte-config/config-models/src/main/resources/types/JobDiscoverCatalogConfig.yaml b/airbyte-config/config-models/src/main/resources/types/JobDiscoverCatalogConfig.yaml index d5c3da44f67f9..406aa47f25131 100644 --- a/airbyte-config/config-models/src/main/resources/types/JobDiscoverCatalogConfig.yaml +++ b/airbyte-config/config-models/src/main/resources/types/JobDiscoverCatalogConfig.yaml @@ -8,6 +8,9 @@ additionalProperties: false required: - connectionConfiguration - dockerImage + - sourceId + - connectorVersion + - configHash properties: connectionConfiguration: description: Integration specific blob. Must be a valid JSON string. @@ -15,3 +18,12 @@ properties: existingJavaType: com.fasterxml.jackson.databind.JsonNode dockerImage: type: string + sourceId: + description: The ID of the source being discovered, so we can persist the result + type: string + connectorVersion: + description: Connector version + type: string + configHash: + description: Config hash + type: string diff --git a/airbyte-config/config-models/src/main/resources/types/StandardDiscoverCatalogInput.yaml b/airbyte-config/config-models/src/main/resources/types/StandardDiscoverCatalogInput.yaml index b0d3b9c738f8f..c80d480caddc3 100644 --- a/airbyte-config/config-models/src/main/resources/types/StandardDiscoverCatalogInput.yaml +++ b/airbyte-config/config-models/src/main/resources/types/StandardDiscoverCatalogInput.yaml @@ -6,9 +6,21 @@ description: information required for connection. type: object required: - connectionConfiguration + - sourceId + - connectorVersion + - configHash additionalProperties: false properties: connectionConfiguration: description: Integration specific blob. Must be a valid JSON string. type: object existingJavaType: com.fasterxml.jackson.databind.JsonNode + sourceId: + description: The ID of the source being discovered, so we can persist the result + type: string + connectorVersion: + description: Connector version, so we can persist the result + type: string + configHash: + description: Config hash, so we can persist the result + type: string diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 23e54a184a1f6..768e0c841df2a 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -220,14 +220,8 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source configRepository.getActorCatalog(discoverSchemaRequestBody.getSourceId(), connectorVersion, configHash); final boolean bustActorCatalogCache = discoverSchemaRequestBody.getDisableCache() != null && discoverSchemaRequestBody.getDisableCache(); if (currentCatalog.isEmpty() || bustActorCatalogCache) { - final SynchronousResponse response = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName); - final SourceDiscoverSchemaRead returnValue = discoverJobToOutput(response); - if (response.isSuccess()) { - final UUID catalogId = - configRepository.writeActorCatalogFetchEvent(response.getOutput(), source.getSourceId(), connectorVersion, configHash); - returnValue.catalogId(catalogId); - } - return returnValue; + final SynchronousResponse response = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName, connectorVersion); + return discoverJobToOutput(response); } final AirbyteCatalog airbyteCatalog = Jsons.object(currentCatalog.get().getCatalog(), AirbyteCatalog.class); final SynchronousJobRead emptyJob = new SynchronousJobRead() @@ -257,16 +251,19 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceCreate(final So final SourceConnection source = new SourceConnection() .withSourceDefinitionId(sourceCreate.getSourceDefinitionId()) .withConfiguration(partialConfig); - final SynchronousResponse response = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName); + final SynchronousResponse response = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName, sourceDef.getDockerImageTag()); return discoverJobToOutput(response); } - private SourceDiscoverSchemaRead discoverJobToOutput(final SynchronousResponse response) { + private SourceDiscoverSchemaRead discoverJobToOutput(final SynchronousResponse response) throws ConfigNotFoundException, IOException { final SourceDiscoverSchemaRead sourceDiscoverSchemaRead = new SourceDiscoverSchemaRead() .jobInfo(jobConverter.getSynchronousJobRead(response)); if (response.isSuccess()) { - sourceDiscoverSchemaRead.catalog(CatalogConverter.toApi(response.getOutput())); + ActorCatalog catalog = configRepository.getActorCatalogById(UUID.fromString(response.getOutput())); + final AirbyteCatalog persistenceCatalog = Jsons.object(catalog.getCatalog(), + io.airbyte.protocol.models.AirbyteCatalog.class); + sourceDiscoverSchemaRead.catalog(CatalogConverter.toApi(persistenceCatalog)); } return sourceDiscoverSchemaRead; diff --git a/airbyte-server/src/main/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClient.java b/airbyte-server/src/main/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClient.java index 1b5c7c4e5939d..20edb7693a778 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClient.java +++ b/airbyte-server/src/main/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClient.java @@ -6,6 +6,10 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.DestinationConnection; @@ -20,7 +24,6 @@ import io.airbyte.persistence.job.factory.OAuthConfigSupplier; import io.airbyte.persistence.job.tracker.JobTracker; import io.airbyte.persistence.job.tracker.JobTracker.JobState; -import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.workers.temporal.TemporalClient; import io.airbyte.workers.temporal.TemporalResponse; @@ -38,6 +41,8 @@ public class DefaultSynchronousSchedulerClient implements SynchronousSchedulerCl private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSynchronousSchedulerClient.class); + private static final HashFunction HASH_FUNCTION = Hashing.md5(); + private final TemporalClient temporalClient; private final JobTracker jobTracker; private final JobErrorReporter jobErrorReporter; @@ -101,7 +106,7 @@ public SynchronousResponse createDestinationCheck } @Override - public SynchronousResponse createDiscoverSchemaJob(final SourceConnection source, final String dockerImage) + public SynchronousResponse createDiscoverSchemaJob(final SourceConnection source, final String dockerImage, final String connectorVersion) throws IOException { final JsonNode sourceConfiguration = oAuthConfigSupplier.injectSourceOAuthParameters( source.getSourceDefinitionId(), @@ -109,7 +114,11 @@ public SynchronousResponse createDiscoverSchemaJob(final SourceC source.getConfiguration()); final JobDiscoverCatalogConfig jobDiscoverCatalogConfig = new JobDiscoverCatalogConfig() .withConnectionConfiguration(sourceConfiguration) - .withDockerImage(dockerImage); + .withDockerImage(dockerImage) + .withSourceId(source.getSourceId().toString()) + .withConfigHash(HASH_FUNCTION.hashBytes(Jsons.serialize(source.getConfiguration()).getBytes( + Charsets.UTF_8)).toString()) + .withConnectorVersion(connectorVersion); final UUID jobId = UUID.randomUUID(); final ConnectorJobReportingContext jobReportingContext = new ConnectorJobReportingContext(jobId, dockerImage); @@ -119,7 +128,7 @@ public SynchronousResponse createDiscoverSchemaJob(final SourceC jobReportingContext, source.getSourceDefinitionId(), () -> temporalClient.submitDiscoverSchema(jobId, 0, jobDiscoverCatalogConfig), - ConnectorJobOutput::getDiscoverCatalog, + ConnectorJobOutput::getDiscoverCatalogId, source.getWorkspaceId()); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/scheduler/SynchronousSchedulerClient.java b/airbyte-server/src/main/java/io/airbyte/server/scheduler/SynchronousSchedulerClient.java index fbe900c19f9b6..68524cdfe850b 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/scheduler/SynchronousSchedulerClient.java +++ b/airbyte-server/src/main/java/io/airbyte/server/scheduler/SynchronousSchedulerClient.java @@ -7,7 +7,6 @@ import io.airbyte.config.DestinationConnection; import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardCheckConnectionOutput; -import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.ConnectorSpecification; import java.io.IOException; @@ -23,7 +22,7 @@ SynchronousResponse createSourceCheckConnectionJo SynchronousResponse createDestinationCheckConnectionJob(DestinationConnection destination, String dockerImage) throws IOException; - SynchronousResponse createDiscoverSchemaJob(SourceConnection source, String dockerImage) throws IOException; + SynchronousResponse createDiscoverSchemaJob(SourceConnection source, String dockerImage, String connectorVersion) throws IOException; SynchronousResponse createGetSpecJob(String dockerImage) throws IOException; diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index 0ace1315d8169..87aae52cfdd85 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -94,6 +94,9 @@ class SchedulerHandlerTest { private static final String DESTINATION_DOCKER_TAG = "tag"; private static final String DESTINATION_DOCKER_IMAGE = DockerUtils.getTaggedImageName(DESTINATION_DOCKER_REPO, DESTINATION_DOCKER_TAG); + private static final AirbyteCatalog airbyteCatalog = CatalogHelpers.createAirbyteCatalog("shoes", + Field.of("sku", JsonSchemaType.STRING)); + private static final SourceConnection SOURCE = new SourceConnection() .withName("my postgres db") .withWorkspaceId(UUID.randomUUID()) @@ -356,12 +359,15 @@ void testDiscoverSchemaForSourceFromSourceId() throws IOException, JsonValidatio final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); final SourceDiscoverSchemaRequestBody request = new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()); - final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; + final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; final SynchronousJobMetadata metadata = mock(SynchronousJobMetadata.class); when(discoverResponse.isSuccess()).thenReturn(true); - final AirbyteCatalog airbyteCatalog = CatalogHelpers.createAirbyteCatalog("shoes", - Field.of("sku", JsonSchemaType.STRING)); - when(discoverResponse.getOutput()).thenReturn(airbyteCatalog); + when(discoverResponse.getOutput()).thenReturn(UUID.randomUUID().toString()); + final ActorCatalog actorCatalog = new ActorCatalog() + .withCatalog(Jsons.jsonNode(airbyteCatalog)) + .withCatalogHash("") + .withId(UUID.randomUUID()); + when(configRepository.getActorCatalogById(any())).thenReturn(actorCatalog); when(discoverResponse.getMetadata()).thenReturn(metadata); when(metadata.isSucceeded()).thenReturn(true); @@ -372,7 +378,7 @@ void testDiscoverSchemaForSourceFromSourceId() throws IOException, JsonValidatio .withSourceDefinitionId(source.getSourceDefinitionId())); when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source); when(configRepository.getActorCatalog(any(), any(), any())).thenReturn(Optional.empty()); - when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE)) + when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG)) .thenReturn(discoverResponse); final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request); @@ -382,8 +388,9 @@ void testDiscoverSchemaForSourceFromSourceId() throws IOException, JsonValidatio assertTrue(actual.getJobInfo().getSucceeded()); verify(configRepository).getSourceConnection(source.getSourceId()); verify(configRepository).getActorCatalog(eq(request.getSourceId()), eq(SOURCE_DOCKER_TAG), any()); - verify(configRepository).writeActorCatalogFetchEvent(eq(airbyteCatalog), eq(source.getSourceId()), eq(SOURCE_DOCKER_TAG), any()); - verify(synchronousSchedulerClient).createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE); + // verify(configRepository).writeActorCatalogFetchEvent(eq(airbyteCatalog), + // eq(source.getSourceId()), eq(SOURCE_DOCKER_TAG), any()); + verify(synchronousSchedulerClient).createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG); } @Test @@ -391,12 +398,10 @@ void testDiscoverSchemaForSourceFromSourceIdCachedCatalog() throws IOException, final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); final SourceDiscoverSchemaRequestBody request = new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()); - final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; + final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; final SynchronousJobMetadata metadata = mock(SynchronousJobMetadata.class); when(discoverResponse.isSuccess()).thenReturn(true); - final AirbyteCatalog airbyteCatalog = CatalogHelpers.createAirbyteCatalog("shoes", - Field.of("sku", JsonSchemaType.STRING)); - when(discoverResponse.getOutput()).thenReturn(airbyteCatalog); + when(discoverResponse.getOutput()).thenReturn(UUID.randomUUID().toString()); when(discoverResponse.getMetadata()).thenReturn(metadata); when(metadata.isSucceeded()).thenReturn(true); @@ -411,7 +416,7 @@ void testDiscoverSchemaForSourceFromSourceIdCachedCatalog() throws IOException, .withCatalogHash("") .withId(UUID.randomUUID()); when(configRepository.getActorCatalog(any(), any(), any())).thenReturn(Optional.of(actorCatalog)); - when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE)) + when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG)) .thenReturn(discoverResponse); final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request); @@ -422,7 +427,7 @@ void testDiscoverSchemaForSourceFromSourceIdCachedCatalog() throws IOException, verify(configRepository).getSourceConnection(source.getSourceId()); verify(configRepository).getActorCatalog(eq(request.getSourceId()), any(), any()); verify(configRepository, never()).writeActorCatalogFetchEvent(any(), any(), any(), any()); - verify(synchronousSchedulerClient, never()).createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE); + verify(synchronousSchedulerClient, never()).createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG); } @Test @@ -430,12 +435,11 @@ void testDiscoverSchemaForSourceFromSourceIdDisableCache() throws IOException, J final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); final SourceDiscoverSchemaRequestBody request = new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).disableCache(true); - final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; + final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; final SynchronousJobMetadata metadata = mock(SynchronousJobMetadata.class); when(discoverResponse.isSuccess()).thenReturn(true); - final AirbyteCatalog airbyteCatalog = CatalogHelpers.createAirbyteCatalog("shoes", - Field.of("sku", JsonSchemaType.STRING)); - when(discoverResponse.getOutput()).thenReturn(airbyteCatalog); + final UUID discoveredCatalogId = UUID.randomUUID(); + when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId.toString()); when(discoverResponse.getMetadata()).thenReturn(metadata); when(metadata.isSucceeded()).thenReturn(true); @@ -448,9 +452,9 @@ void testDiscoverSchemaForSourceFromSourceIdDisableCache() throws IOException, J final ActorCatalog actorCatalog = new ActorCatalog() .withCatalog(Jsons.jsonNode(airbyteCatalog)) .withCatalogHash("") - .withId(UUID.randomUUID()); - when(configRepository.getActorCatalog(any(), any(), any())).thenReturn(Optional.of(actorCatalog)); - when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE)) + .withId(discoveredCatalogId); + when(configRepository.getActorCatalogById(discoveredCatalogId)).thenReturn(actorCatalog); + when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG)) .thenReturn(discoverResponse); final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request); @@ -460,8 +464,7 @@ void testDiscoverSchemaForSourceFromSourceIdDisableCache() throws IOException, J assertTrue(actual.getJobInfo().getSucceeded()); verify(configRepository).getSourceConnection(source.getSourceId()); verify(configRepository).getActorCatalog(eq(request.getSourceId()), any(), any()); - verify(configRepository).writeActorCatalogFetchEvent(any(), any(), any(), any()); - verify(synchronousSchedulerClient).createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE); + verify(synchronousSchedulerClient).createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG); } @Test @@ -475,8 +478,8 @@ void testDiscoverSchemaForSourceFromSourceIdFailed() throws IOException, JsonVal .withDockerImageTag(SOURCE_DOCKER_TAG) .withSourceDefinitionId(source.getSourceDefinitionId())); when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source); - when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE)) - .thenReturn((SynchronousResponse) jobResponse); + when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG)) + .thenReturn((SynchronousResponse) jobResponse); when(completedJob.getSuccessOutput()).thenReturn(Optional.empty()); when(completedJob.getStatus()).thenReturn(JobStatus.FAILED); @@ -486,7 +489,7 @@ void testDiscoverSchemaForSourceFromSourceIdFailed() throws IOException, JsonVal assertNotNull(actual.getJobInfo()); assertFalse(actual.getJobInfo().getSucceeded()); verify(configRepository).getSourceConnection(source.getSourceId()); - verify(synchronousSchedulerClient).createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE); + verify(synchronousSchedulerClient).createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG); } @Test @@ -495,23 +498,28 @@ void testDiscoverSchemaForSourceFromSourceCreate() throws JsonValidationExceptio .withSourceDefinitionId(SOURCE.getSourceDefinitionId()) .withConfiguration(SOURCE.getConfiguration()); - final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; + final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; final SynchronousJobMetadata metadata = mock(SynchronousJobMetadata.class); when(discoverResponse.isSuccess()).thenReturn(true); - when(discoverResponse.getOutput()).thenReturn(new AirbyteCatalog()); + when(discoverResponse.getOutput()).thenReturn(UUID.randomUUID().toString()); when(discoverResponse.getMetadata()).thenReturn(metadata); when(metadata.isSucceeded()).thenReturn(true); final SourceCoreConfig sourceCoreConfig = new SourceCoreConfig() .sourceDefinitionId(source.getSourceDefinitionId()) .connectionConfiguration(source.getConfiguration()); - + when(discoverResponse.getOutput()).thenReturn(UUID.randomUUID().toString()); + final ActorCatalog actorCatalog = new ActorCatalog() + .withCatalog(Jsons.jsonNode(airbyteCatalog)) + .withCatalogHash("") + .withId(UUID.randomUUID()); + when(configRepository.getActorCatalogById(any())).thenReturn(actorCatalog); when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId())) .thenReturn(new StandardSourceDefinition() .withDockerRepository(SOURCE_DOCKER_REPO) .withDockerImageTag(SOURCE_DOCKER_TAG) .withSourceDefinitionId(source.getSourceDefinitionId())); - when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE)) + when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG)) .thenReturn(discoverResponse); when(secretsRepositoryWriter.statefulSplitEphemeralSecrets( eq(source.getConfiguration()), @@ -522,7 +530,7 @@ void testDiscoverSchemaForSourceFromSourceCreate() throws JsonValidationExceptio assertNotNull(actual.getCatalog()); assertNotNull(actual.getJobInfo()); assertTrue(actual.getJobInfo().getSucceeded()); - verify(synchronousSchedulerClient).createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE); + verify(synchronousSchedulerClient).createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG); } @Test @@ -540,8 +548,8 @@ void testDiscoverSchemaForSourceFromSourceCreateFailed() throws JsonValidationEx .withDockerRepository(SOURCE_DOCKER_REPO) .withDockerImageTag(SOURCE_DOCKER_TAG) .withSourceDefinitionId(source.getSourceDefinitionId())); - when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE)) - .thenReturn((SynchronousResponse) jobResponse); + when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG)) + .thenReturn((SynchronousResponse) jobResponse); when(secretsRepositoryWriter.statefulSplitEphemeralSecrets( eq(source.getConfiguration()), any())).thenReturn(source.getConfiguration()); @@ -553,7 +561,7 @@ void testDiscoverSchemaForSourceFromSourceCreateFailed() throws JsonValidationEx assertNull(actual.getCatalog()); assertNotNull(actual.getJobInfo()); assertFalse(actual.getJobInfo().getSucceeded()); - verify(synchronousSchedulerClient).createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE); + verify(synchronousSchedulerClient).createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG); } @Test diff --git a/airbyte-server/src/test/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClientTest.java b/airbyte-server/src/test/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClientTest.java index 79bdbaf4d2aaf..6952f3d3e8e08 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClientTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClientTest.java @@ -33,7 +33,6 @@ import io.airbyte.persistence.job.factory.OAuthConfigSupplier; import io.airbyte.persistence.job.tracker.JobTracker; import io.airbyte.persistence.job.tracker.JobTracker.JobState; -import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.workers.temporal.JobMetadata; import io.airbyte.workers.temporal.TemporalClient; @@ -55,6 +54,8 @@ class DefaultSynchronousSchedulerClientTest { private static final Path LOG_PATH = Path.of("/tmp"); private static final String DOCKER_IMAGE = "foo/bar"; + + private static final String DOCKER_IMAGE_TAG = "baz/qux"; private static final UUID WORKSPACE_ID = UUID.randomUUID(); private static final UUID UUID1 = UUID.randomUUID(); private static final UUID UUID2 = UUID.randomUUID(); @@ -229,16 +230,12 @@ void testCreateDestinationCheckConnectionJob() throws IOException { @Test void testCreateDiscoverSchemaJob() throws IOException { - final JobDiscoverCatalogConfig jobDiscoverCatalogConfig = new JobDiscoverCatalogConfig() - .withConnectionConfiguration(SOURCE_CONNECTION.getConfiguration()) - .withDockerImage(DOCKER_IMAGE); - final AirbyteCatalog mockOutput = mock(AirbyteCatalog.class); - final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withDiscoverCatalog(mockOutput); - when(temporalClient.submitDiscoverSchema(any(UUID.class), eq(0), eq(jobDiscoverCatalogConfig))) + final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withDiscoverCatalogId("foo"); + when(temporalClient.submitDiscoverSchema(any(UUID.class), eq(0), any(JobDiscoverCatalogConfig.class))) .thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true))); - final SynchronousResponse response = schedulerClient.createDiscoverSchemaJob(SOURCE_CONNECTION, DOCKER_IMAGE); - assertEquals(mockOutput, response.getOutput()); + final SynchronousResponse response = schedulerClient.createDiscoverSchemaJob(SOURCE_CONNECTION, DOCKER_IMAGE, DOCKER_IMAGE_TAG); + assertEquals("foo", response.getOutput()); } @Test diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java index 7ecbb8d0d9da8..675035603e1d4 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java @@ -126,7 +126,7 @@ public static ConnectorJobOutput getJobFailureOutputOrThrow(final OutputType out final ConnectorCommand connectorCommand = switch (outputType) { case SPEC -> ConnectorCommand.SPEC; case CHECK_CONNECTION -> ConnectorCommand.CHECK; - case DISCOVER_CATALOG -> ConnectorCommand.DISCOVER; + case DISCOVER_CATALOG_ID -> ConnectorCommand.DISCOVER; }; final FailureReason failureReason = FailureHelper.connectorCommandFailure(traceMessage.get(), null, null, connectorCommand); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java index 474d206137f39..25f77f572b3a6 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java @@ -10,6 +10,7 @@ import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.ConnectorJobOutput.OutputType; import io.airbyte.config.StandardDiscoverCatalogInput; +import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; @@ -24,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -32,7 +34,8 @@ public class DefaultDiscoverCatalogWorker implements DiscoverCatalogWorker { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultDiscoverCatalogWorker.class); - private static final int TEMPORAL_MESSAGE_LIMIT_MB = 1024 * 1024 * 4; + + private final ConfigRepository configRepository; private final WorkerConfigs workerConfigs; private final IntegrationLauncher integrationLauncher; @@ -40,16 +43,20 @@ public class DefaultDiscoverCatalogWorker implements DiscoverCatalogWorker { private volatile Process process; - public DefaultDiscoverCatalogWorker(final WorkerConfigs workerConfigs, + public DefaultDiscoverCatalogWorker(final ConfigRepository configRepository, + final WorkerConfigs workerConfigs, final IntegrationLauncher integrationLauncher, final AirbyteStreamFactory streamFactory) { + this.configRepository = configRepository; this.workerConfigs = workerConfigs; this.integrationLauncher = integrationLauncher; this.streamFactory = streamFactory; } - public DefaultDiscoverCatalogWorker(final WorkerConfigs workerConfigs, final IntegrationLauncher integrationLauncher) { - this(workerConfigs, integrationLauncher, new DefaultAirbyteStreamFactory()); + public DefaultDiscoverCatalogWorker(final ConfigRepository configRepository, + final WorkerConfigs workerConfigs, + final IntegrationLauncher integrationLauncher) { + this(configRepository, workerConfigs, integrationLauncher, new DefaultAirbyteStreamFactory()); } @Override @@ -82,16 +89,14 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI throw new WorkerException("Integration failed to output a catalog struct."); } - // This message is sent through temporal that utilizes GRPC as is set to limit messages to 4mb. - // We fail fast here so users will not have to wait the 10 minutes before a timeout error occurs. - if (catalog.get().toString().length() > TEMPORAL_MESSAGE_LIMIT_MB) { - throw new WorkerException("Output a catalog struct bigger than 4mb. Larger than grpc max message limit."); - } - - return new ConnectorJobOutput().withOutputType(OutputType.DISCOVER_CATALOG).withDiscoverCatalog(catalog.get()); + final UUID catalogId = + configRepository.writeActorCatalogFetchEvent(catalog.get(), UUID.fromString(discoverSchemaInput.getSourceId()), + discoverSchemaInput.getConnectorVersion(), + discoverSchemaInput.getConfigHash()); + return new ConnectorJobOutput().withOutputType(OutputType.DISCOVER_CATALOG_ID).withDiscoverCatalogId(catalogId.toString()); } else { return WorkerUtils.getJobFailureOutputOrThrow( - OutputType.DISCOVER_CATALOG, + OutputType.DISCOVER_CATALOG_ID, messagesByType, String.format("Discover job subprocess finished with exit code %s", exitCode)); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java index 8d6e7514dfe17..1174de133c686 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java @@ -140,7 +140,8 @@ public TemporalResponse submitDiscoverSchema(final UUID jobI .withJobId(jobId.toString()) .withAttemptId((long) attempt) .withDockerImage(config.getDockerImage()); - final StandardDiscoverCatalogInput input = new StandardDiscoverCatalogInput().withConnectionConfiguration(config.getConnectionConfiguration()); + final StandardDiscoverCatalogInput input = new StandardDiscoverCatalogInput().withConnectionConfiguration(config.getConnectionConfiguration()) + .withSourceId(config.getSourceId()).withConnectorVersion(config.getConnectorVersion()).withConfigHash(config.getConfigHash()); return execute(jobRunConfig, () -> getWorkflowStub(DiscoverCatalogWorkflow.class, TemporalJobType.DISCOVER_SCHEMA).run(jobRunConfig, launcherConfig, input)); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java index 15bdbb4e74cf1..11c3ae1eae765 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java @@ -12,6 +12,7 @@ import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.StandardDiscoverCatalogInput; import io.airbyte.config.helpers.LogConfigs; +import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.split_secrets.SecretsHydrator; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import io.airbyte.persistence.job.models.JobRunConfig; @@ -48,14 +49,18 @@ public class DiscoverCatalogActivityImpl implements DiscoverCatalogActivity { private final AirbyteApiClient airbyteApiClient;; private final String airbyteVersion; + private final ConfigRepository configRepository; + public DiscoverCatalogActivityImpl(@Named("discoverWorkerConfigs") final WorkerConfigs workerConfigs, @Named("discoverProcessFactory") final ProcessFactory processFactory, + final ConfigRepository configRepository, final SecretsHydrator secretsHydrator, @Named("workspaceRoot") final Path workspaceRoot, final WorkerEnvironment workerEnvironment, final LogConfigs logConfigs, final AirbyteApiClient airbyteApiClient, @Value("${airbyte.version}") final String airbyteVersion) { + this.configRepository = configRepository; this.workerConfigs = workerConfigs; this.processFactory = processFactory; this.secretsHydrator = secretsHydrator; @@ -73,7 +78,10 @@ public ConnectorJobOutput run(final JobRunConfig jobRunConfig, final JsonNode fullConfig = secretsHydrator.hydrate(config.getConnectionConfiguration()); final StandardDiscoverCatalogInput input = new StandardDiscoverCatalogInput() - .withConnectionConfiguration(fullConfig); + .withConnectionConfiguration(fullConfig) + .withSourceId(config.getSourceId()) + .withConnectorVersion(config.getConnectorVersion()) + .withConfigHash(config.getConfigHash()); final ActivityExecutionContext context = Activity.getExecutionContext(); @@ -99,7 +107,7 @@ private CheckedSupplier new AirbyteIntegrationLauncher(launcherConfig.getJobId(), launcherConfig.getAttemptId().intValue(), launcherConfig.getDockerImage(), processFactory, workerConfigs.getResourceRequirements()); final AirbyteStreamFactory streamFactory = new DefaultAirbyteStreamFactory(); - return new DefaultDiscoverCatalogWorker(workerConfigs, integrationLauncher, streamFactory); + return new DefaultDiscoverCatalogWorker(configRepository, workerConfigs, integrationLauncher, streamFactory); }; } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java index 5d99f6175756f..1eb445810970e 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java @@ -12,6 +12,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -28,6 +30,7 @@ import io.airbyte.config.EnvConfigs; import io.airbyte.config.FailureReason; import io.airbyte.config.StandardDiscoverCatalogInput; +import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; @@ -45,14 +48,22 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; +import java.util.UUID; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mock; class DefaultDiscoverCatalogWorkerTest { + @Mock + private ConfigRepository mConfigRepository; + private static final JsonNode CREDENTIALS = Jsons.jsonNode(ImmutableMap.builder().put("apiKey", "123").build()); - private static final StandardDiscoverCatalogInput INPUT = new StandardDiscoverCatalogInput().withConnectionConfiguration(CREDENTIALS); + + private static final UUID SOURCE_ID = UUID.randomUUID(); + private static final StandardDiscoverCatalogInput INPUT = + new StandardDiscoverCatalogInput().withConnectionConfiguration(CREDENTIALS).withSourceId(SOURCE_ID.toString()); private static final Path TEST_ROOT = Path.of("/tmp/airbyte_tests"); private static final String STREAM = "users"; @@ -70,12 +81,18 @@ class DefaultDiscoverCatalogWorkerTest { private Process process; private AirbyteStreamFactory streamFactory; + private UUID CATALOG_ID; + @BeforeEach void setup() throws Exception { workerConfigs = new WorkerConfigs(new EnvConfigs()); jobRoot = Files.createTempDirectory(Files.createDirectories(TEST_ROOT), ""); integrationLauncher = mock(IntegrationLauncher.class, RETURNS_DEEP_STUBS); process = mock(Process.class); + mConfigRepository = mock(ConfigRepository.class); + + CATALOG_ID = UUID.randomUUID(); + when(mConfigRepository.writeActorCatalogFetchEvent(any(), any(), any(), any())).thenReturn(CATALOG_ID); when(integrationLauncher.discover(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(CREDENTIALS))).thenReturn(process); final InputStream inputStream = mock(InputStream.class); @@ -90,12 +107,14 @@ void setup() throws Exception { @SuppressWarnings("BusyWait") @Test void testDiscoverSchema() throws Exception { - final DefaultDiscoverCatalogWorker worker = new DefaultDiscoverCatalogWorker(workerConfigs, integrationLauncher, streamFactory); + final DefaultDiscoverCatalogWorker worker = + new DefaultDiscoverCatalogWorker(mConfigRepository, workerConfigs, integrationLauncher, streamFactory); final ConnectorJobOutput output = worker.run(INPUT, jobRoot); assertNull(output.getFailureReason()); - assertEquals(OutputType.DISCOVER_CATALOG, output.getOutputType()); - assertEquals(CATALOG, output.getDiscoverCatalog()); + assertEquals(OutputType.DISCOVER_CATALOG_ID, output.getOutputType()); + assertEquals(CATALOG_ID.toString(), output.getDiscoverCatalogId()); + verify(mConfigRepository).writeActorCatalogFetchEvent(eq(CATALOG), eq(SOURCE_ID), any(), any()); Assertions.assertTimeout(Duration.ofSeconds(5), () -> { while (process.getErrorStream().available() != 0) { @@ -111,7 +130,8 @@ void testDiscoverSchema() throws Exception { void testDiscoverSchemaProcessFail() throws Exception { when(process.exitValue()).thenReturn(1); - final DefaultDiscoverCatalogWorker worker = new DefaultDiscoverCatalogWorker(workerConfigs, integrationLauncher, streamFactory); + final DefaultDiscoverCatalogWorker worker = + new DefaultDiscoverCatalogWorker(mConfigRepository, workerConfigs, integrationLauncher, streamFactory); assertThrows(WorkerException.class, () -> worker.run(INPUT, jobRoot)); Assertions.assertTimeout(Duration.ofSeconds(5), () -> { @@ -131,10 +151,11 @@ void testDiscoverSchemaProcessFailWithTraceMessage() throws Exception { when(process.exitValue()).thenReturn(1); - final DefaultDiscoverCatalogWorker worker = new DefaultDiscoverCatalogWorker(workerConfigs, integrationLauncher, traceStreamFactory); + final DefaultDiscoverCatalogWorker worker = + new DefaultDiscoverCatalogWorker(mConfigRepository, workerConfigs, integrationLauncher, traceStreamFactory); final ConnectorJobOutput output = worker.run(INPUT, jobRoot); - assertEquals(OutputType.DISCOVER_CATALOG, output.getOutputType()); - assertNull(output.getDiscoverCatalog()); + // assertEquals(OutputType.DISCOVER_CATALOG, output.getOutputType()); + // assertNull(output.getDiscoverCatalog()); assertNotNull(output.getFailureReason()); final FailureReason failureReason = output.getFailureReason(); @@ -154,13 +175,15 @@ void testDiscoverSchemaException() throws WorkerException { when(integrationLauncher.discover(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(CREDENTIALS))) .thenThrow(new RuntimeException()); - final DefaultDiscoverCatalogWorker worker = new DefaultDiscoverCatalogWorker(workerConfigs, integrationLauncher, streamFactory); + final DefaultDiscoverCatalogWorker worker = + new DefaultDiscoverCatalogWorker(mConfigRepository, workerConfigs, integrationLauncher, streamFactory); assertThrows(WorkerException.class, () -> worker.run(INPUT, jobRoot)); } @Test void testCancel() throws WorkerException { - final DefaultDiscoverCatalogWorker worker = new DefaultDiscoverCatalogWorker(workerConfigs, integrationLauncher, streamFactory); + final DefaultDiscoverCatalogWorker worker = + new DefaultDiscoverCatalogWorker(mConfigRepository, workerConfigs, integrationLauncher, streamFactory); worker.run(INPUT, jobRoot); worker.cancel(); diff --git a/tools/bin/make-big-schema.sh b/tools/bin/make-big-schema.sh new file mode 100755 index 0000000000000..ea26b032e3eb2 --- /dev/null +++ b/tools/bin/make-big-schema.sh @@ -0,0 +1,10 @@ +# This script creates lots of tables in a database. It is used to test the handling of large schemas. +# Pre-requisite: follow https://airbyte.com/tutorials/postgres-replication to create the source container. + +for i in {1..15000} +do + docker exec -it airbyte-source psql -U postgres -c "CREATE TABLE users$i(id SERIAL PRIMARY KEY, col1 VARCHAR(200));"; + docker exec -it airbyte-source psql -U postgres -c "INSERT INTO public.users$i(col1) VALUES('record1');"; + docker exec -it airbyte-source psql -U postgres -c "INSERT INTO public.users$i(col1) VALUES('record2');"; + docker exec -it airbyte-source psql -U postgres -c "INSERT INTO public.users$i(col1) VALUES('record3');"; +done \ No newline at end of file From 9f6e90fbac0882777e5d392618763f8b216c80a8 Mon Sep 17 00:00:00 2001 From: Michael Siega Date: Mon, 3 Oct 2022 21:28:35 +0200 Subject: [PATCH 2/5] update generic source tests to handle new approach to schema discovery --- .../bases/standard-source-test/build.gradle | 2 ++ .../source/AbstractSourceConnectorTest.java | 23 ++++++++++++++++--- .../AbstractSourceDatabaseTypeTest.java | 3 ++- .../source/SourceAcceptanceTest.java | 7 +++--- 4 files changed, 28 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/bases/standard-source-test/build.gradle b/airbyte-integrations/bases/standard-source-test/build.gradle index de29cff38ce83..f6d787893dc99 100644 --- a/airbyte-integrations/bases/standard-source-test/build.gradle +++ b/airbyte-integrations/bases/standard-source-test/build.gradle @@ -14,8 +14,10 @@ import org.jsoup.Jsoup; dependencies { implementation project(':airbyte-db:db-lib') implementation project(':airbyte-config:config-models') + implementation project(':airbyte-config:config-persistence') implementation project(':airbyte-protocol:protocol-models') implementation project(':airbyte-workers') + implementation 'org.mockito:mockito-core:4.6.1' implementation 'net.sourceforge.argparse4j:argparse4j:0.8.1' diff --git a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java index ecea6f96332e6..05b8646af5952 100644 --- a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java +++ b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java @@ -6,6 +6,9 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.config.EnvConfigs; @@ -16,6 +19,7 @@ import io.airbyte.config.StandardDiscoverCatalogInput; import io.airbyte.config.State; import io.airbyte.config.WorkerSourceConfig; +import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; @@ -42,6 +46,7 @@ import java.util.Optional; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.mockito.ArgumentCaptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,6 +106,14 @@ public abstract class AbstractSourceConnectorTest { private WorkerConfigs workerConfigs; + private ConfigRepository mConfigRepository; + + private final ArgumentCaptor lastPersistedCatalog = ArgumentCaptor.forClass(AirbyteCatalog.class); + + protected AirbyteCatalog getLastPersistedCatalog() { + return lastPersistedCatalog.getValue(); + } + @BeforeEach public void setUpInternal() throws Exception { final Path testDir = Path.of("/tmp/airbyte_tests/"); @@ -110,6 +123,7 @@ public void setUpInternal() throws Exception { localRoot = Files.createTempDirectory(testDir, "output"); environment = new TestDestinationEnv(localRoot); workerConfigs = new WorkerConfigs(new EnvConfigs()); + mConfigRepository = mock(ConfigRepository.class); processFactory = new DockerProcessFactory( workerConfigs, workspaceRoot, @@ -146,11 +160,14 @@ protected String runCheckAndGetStatusAsString(final JsonNode config) throws Exce .run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot).getCheckConnection().getStatus().toString(); } - protected AirbyteCatalog runDiscover() throws Exception { - return new DefaultDiscoverCatalogWorker( + protected String runDiscover() throws Exception { + final String toReturn = new DefaultDiscoverCatalogWorker( + mConfigRepository, workerConfigs, new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements())) - .run(new StandardDiscoverCatalogInput().withConnectionConfiguration(getConfig()), jobRoot).getDiscoverCatalog(); + .run(new StandardDiscoverCatalogInput().withConnectionConfiguration(getConfig()), jobRoot).getDiscoverCatalogId(); + verify(mConfigRepository).writeActorCatalogFetchEvent(lastPersistedCatalog.capture(), any(), any(), any()); + return toReturn; } protected void checkEntrypointEnvVariable() throws Exception { diff --git a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.java b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.java index 7f5adf34bc2b1..63d7dc5dd9e8b 100644 --- a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.java +++ b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.java @@ -108,7 +108,8 @@ protected boolean testCatalog() { public void testDataTypes() throws Exception { final ConfiguredAirbyteCatalog catalog = getConfiguredCatalog(); final List allMessages = runRead(catalog); - final Map streams = runDiscover().getStreams().stream() + final String catalogId = runDiscover(); + final Map streams = getLastPersistedCatalog().getStreams().stream() .collect(Collectors.toMap(AirbyteStream::getName, s -> s)); final List recordMessages = allMessages.stream().filter(m -> m.getType() == Type.RECORD).toList(); final Map> expectedValues = new HashMap<>(); diff --git a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceAcceptanceTest.java b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceAcceptanceTest.java index a6e2d50c85aa9..84f9b2513859f 100644 --- a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceAcceptanceTest.java +++ b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceAcceptanceTest.java @@ -153,9 +153,10 @@ public void testCheckConnection() throws Exception { */ @Test public void testDiscover() throws Exception { - final AirbyteCatalog discoverOutput = runDiscover(); - assertNotNull(discoverOutput, "Expected discover to produce a catalog"); - verifyCatalog(discoverOutput); + final String discoverOutput = runDiscover(); + final AirbyteCatalog discoveredCatalog = getLastPersistedCatalog(); + assertNotNull(discoveredCatalog, "Expected discover to produce a catalog"); + verifyCatalog(discoveredCatalog); } /** From 70847f8bb3085bf5d3523abcee4cb83dc7b5ba6a Mon Sep 17 00:00:00 2001 From: Michael Siega Date: Mon, 3 Oct 2022 21:29:00 +0200 Subject: [PATCH 3/5] readability improvements related to schema discovery and large schema support --- .../resources/types/ConnectorJobOutput.yaml | 2 ++ .../types/JobDiscoverCatalogConfig.yaml | 9 +++++--- .../server/handlers/SchedulerHandler.java | 10 ++++---- .../DefaultSynchronousSchedulerClient.java | 2 +- .../scheduler/SynchronousSchedulerClient.java | 3 ++- .../server/handlers/SchedulerHandlerTest.java | 23 ++++++++----------- ...DefaultSynchronousSchedulerClientTest.java | 8 +++---- .../general/DefaultDiscoverCatalogWorker.java | 2 +- .../DefaultDiscoverCatalogWorkerTest.java | 2 +- tools/bin/make-big-schema.sh | 2 +- 10 files changed, 33 insertions(+), 30 deletions(-) diff --git a/airbyte-config/config-models/src/main/resources/types/ConnectorJobOutput.yaml b/airbyte-config/config-models/src/main/resources/types/ConnectorJobOutput.yaml index 48376f03848ad..48407348bbd42 100644 --- a/airbyte-config/config-models/src/main/resources/types/ConnectorJobOutput.yaml +++ b/airbyte-config/config-models/src/main/resources/types/ConnectorJobOutput.yaml @@ -17,7 +17,9 @@ properties: checkConnection: "$ref": StandardCheckConnectionOutput.yaml discoverCatalogId: + description: A UUID for the discovered catalog which is persisted by the job type: string + format: uuid spec: existingJavaType: io.airbyte.protocol.models.ConnectorSpecification failureReason: diff --git a/airbyte-config/config-models/src/main/resources/types/JobDiscoverCatalogConfig.yaml b/airbyte-config/config-models/src/main/resources/types/JobDiscoverCatalogConfig.yaml index 406aa47f25131..174e935bf997a 100644 --- a/airbyte-config/config-models/src/main/resources/types/JobDiscoverCatalogConfig.yaml +++ b/airbyte-config/config-models/src/main/resources/types/JobDiscoverCatalogConfig.yaml @@ -18,12 +18,15 @@ properties: existingJavaType: com.fasterxml.jackson.databind.JsonNode dockerImage: type: string + description: The connector image + example: airbyte/source-postgres:1.0.12 sourceId: - description: The ID of the source being discovered, so we can persist the result + description: The ID of the source being discovered, so we can persist this alongside the discovered catalog type: string connectorVersion: - description: Connector version + description: Connector version, so we can persist this alongside the discovered catalog type: string + example: 1.0.12 configHash: - description: Config hash + description: Hash of the source configuration -- see `configuration` field in SourceConnection.yaml -- so we can persist this alongside the discovered catalog. type: string diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 768e0c841df2a..9db712ff35ed1 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -220,8 +220,8 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source configRepository.getActorCatalog(discoverSchemaRequestBody.getSourceId(), connectorVersion, configHash); final boolean bustActorCatalogCache = discoverSchemaRequestBody.getDisableCache() != null && discoverSchemaRequestBody.getDisableCache(); if (currentCatalog.isEmpty() || bustActorCatalogCache) { - final SynchronousResponse response = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName, connectorVersion); - return discoverJobToOutput(response); + final SynchronousResponse persistedCatalogId = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName, connectorVersion); + return retrieveDiscoveredSchema(persistedCatalogId); } final AirbyteCatalog airbyteCatalog = Jsons.object(currentCatalog.get().getCatalog(), AirbyteCatalog.class); final SynchronousJobRead emptyJob = new SynchronousJobRead() @@ -251,16 +251,16 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceCreate(final So final SourceConnection source = new SourceConnection() .withSourceDefinitionId(sourceCreate.getSourceDefinitionId()) .withConfiguration(partialConfig); - final SynchronousResponse response = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName, sourceDef.getDockerImageTag()); + final SynchronousResponse response = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName, sourceDef.getDockerImageTag()); return discoverJobToOutput(response); } - private SourceDiscoverSchemaRead discoverJobToOutput(final SynchronousResponse response) throws ConfigNotFoundException, IOException { + private SourceDiscoverSchemaRead retrieveDiscoveredSchema(final SynchronousResponse response) throws ConfigNotFoundException, IOException { final SourceDiscoverSchemaRead sourceDiscoverSchemaRead = new SourceDiscoverSchemaRead() .jobInfo(jobConverter.getSynchronousJobRead(response)); if (response.isSuccess()) { - ActorCatalog catalog = configRepository.getActorCatalogById(UUID.fromString(response.getOutput())); + ActorCatalog catalog = configRepository.getActorCatalogById(response.getOutput()); final AirbyteCatalog persistenceCatalog = Jsons.object(catalog.getCatalog(), io.airbyte.protocol.models.AirbyteCatalog.class); sourceDiscoverSchemaRead.catalog(CatalogConverter.toApi(persistenceCatalog)); diff --git a/airbyte-server/src/main/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClient.java b/airbyte-server/src/main/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClient.java index 20edb7693a778..749fa0b08bed1 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClient.java +++ b/airbyte-server/src/main/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClient.java @@ -106,7 +106,7 @@ public SynchronousResponse createDestinationCheck } @Override - public SynchronousResponse createDiscoverSchemaJob(final SourceConnection source, final String dockerImage, final String connectorVersion) + public SynchronousResponse createDiscoverSchemaJob(final SourceConnection source, final String dockerImage, final String connectorVersion) throws IOException { final JsonNode sourceConfiguration = oAuthConfigSupplier.injectSourceOAuthParameters( source.getSourceDefinitionId(), diff --git a/airbyte-server/src/main/java/io/airbyte/server/scheduler/SynchronousSchedulerClient.java b/airbyte-server/src/main/java/io/airbyte/server/scheduler/SynchronousSchedulerClient.java index 68524cdfe850b..9bdb462280e8b 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/scheduler/SynchronousSchedulerClient.java +++ b/airbyte-server/src/main/java/io/airbyte/server/scheduler/SynchronousSchedulerClient.java @@ -9,6 +9,7 @@ import io.airbyte.config.StandardCheckConnectionOutput; import io.airbyte.protocol.models.ConnectorSpecification; import java.io.IOException; +import java.util.UUID; /** * Exposes a way of executing short-lived jobs as RPC calls. Blocks until the job completes. No @@ -22,7 +23,7 @@ SynchronousResponse createSourceCheckConnectionJo SynchronousResponse createDestinationCheckConnectionJob(DestinationConnection destination, String dockerImage) throws IOException; - SynchronousResponse createDiscoverSchemaJob(SourceConnection source, String dockerImage, String connectorVersion) throws IOException; + SynchronousResponse createDiscoverSchemaJob(SourceConnection source, String dockerImage, String connectorVersion) throws IOException; SynchronousResponse createGetSpecJob(String dockerImage) throws IOException; diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index 87aae52cfdd85..5dbe9a8149a8c 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -359,10 +359,10 @@ void testDiscoverSchemaForSourceFromSourceId() throws IOException, JsonValidatio final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); final SourceDiscoverSchemaRequestBody request = new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()); - final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; + final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; final SynchronousJobMetadata metadata = mock(SynchronousJobMetadata.class); when(discoverResponse.isSuccess()).thenReturn(true); - when(discoverResponse.getOutput()).thenReturn(UUID.randomUUID().toString()); + when(discoverResponse.getOutput()).thenReturn(UUID.randomUUID()); final ActorCatalog actorCatalog = new ActorCatalog() .withCatalog(Jsons.jsonNode(airbyteCatalog)) .withCatalogHash("") @@ -388,8 +388,6 @@ void testDiscoverSchemaForSourceFromSourceId() throws IOException, JsonValidatio assertTrue(actual.getJobInfo().getSucceeded()); verify(configRepository).getSourceConnection(source.getSourceId()); verify(configRepository).getActorCatalog(eq(request.getSourceId()), eq(SOURCE_DOCKER_TAG), any()); - // verify(configRepository).writeActorCatalogFetchEvent(eq(airbyteCatalog), - // eq(source.getSourceId()), eq(SOURCE_DOCKER_TAG), any()); verify(synchronousSchedulerClient).createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG); } @@ -398,10 +396,10 @@ void testDiscoverSchemaForSourceFromSourceIdCachedCatalog() throws IOException, final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); final SourceDiscoverSchemaRequestBody request = new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()); - final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; + final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; final SynchronousJobMetadata metadata = mock(SynchronousJobMetadata.class); when(discoverResponse.isSuccess()).thenReturn(true); - when(discoverResponse.getOutput()).thenReturn(UUID.randomUUID().toString()); + when(discoverResponse.getOutput()).thenReturn(UUID.randomUUID()); when(discoverResponse.getMetadata()).thenReturn(metadata); when(metadata.isSucceeded()).thenReturn(true); @@ -435,11 +433,11 @@ void testDiscoverSchemaForSourceFromSourceIdDisableCache() throws IOException, J final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); final SourceDiscoverSchemaRequestBody request = new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).disableCache(true); - final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; + final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; final SynchronousJobMetadata metadata = mock(SynchronousJobMetadata.class); when(discoverResponse.isSuccess()).thenReturn(true); final UUID discoveredCatalogId = UUID.randomUUID(); - when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId.toString()); + when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); when(discoverResponse.getMetadata()).thenReturn(metadata); when(metadata.isSucceeded()).thenReturn(true); @@ -479,7 +477,7 @@ void testDiscoverSchemaForSourceFromSourceIdFailed() throws IOException, JsonVal .withSourceDefinitionId(source.getSourceDefinitionId())); when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source); when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG)) - .thenReturn((SynchronousResponse) jobResponse); + .thenReturn((SynchronousResponse) jobResponse); when(completedJob.getSuccessOutput()).thenReturn(Optional.empty()); when(completedJob.getStatus()).thenReturn(JobStatus.FAILED); @@ -498,17 +496,16 @@ void testDiscoverSchemaForSourceFromSourceCreate() throws JsonValidationExceptio .withSourceDefinitionId(SOURCE.getSourceDefinitionId()) .withConfiguration(SOURCE.getConfiguration()); - final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; + final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; final SynchronousJobMetadata metadata = mock(SynchronousJobMetadata.class); when(discoverResponse.isSuccess()).thenReturn(true); - when(discoverResponse.getOutput()).thenReturn(UUID.randomUUID().toString()); + when(discoverResponse.getOutput()).thenReturn(UUID.randomUUID()); when(discoverResponse.getMetadata()).thenReturn(metadata); when(metadata.isSucceeded()).thenReturn(true); final SourceCoreConfig sourceCoreConfig = new SourceCoreConfig() .sourceDefinitionId(source.getSourceDefinitionId()) .connectionConfiguration(source.getConfiguration()); - when(discoverResponse.getOutput()).thenReturn(UUID.randomUUID().toString()); final ActorCatalog actorCatalog = new ActorCatalog() .withCatalog(Jsons.jsonNode(airbyteCatalog)) .withCatalogHash("") @@ -549,7 +546,7 @@ void testDiscoverSchemaForSourceFromSourceCreateFailed() throws JsonValidationEx .withDockerImageTag(SOURCE_DOCKER_TAG) .withSourceDefinitionId(source.getSourceDefinitionId())); when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG)) - .thenReturn((SynchronousResponse) jobResponse); + .thenReturn((SynchronousResponse) jobResponse); when(secretsRepositoryWriter.statefulSplitEphemeralSecrets( eq(source.getConfiguration()), any())).thenReturn(source.getConfiguration()); diff --git a/airbyte-server/src/test/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClientTest.java b/airbyte-server/src/test/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClientTest.java index 6952f3d3e8e08..e8e920be670bd 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClientTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClientTest.java @@ -230,12 +230,12 @@ void testCreateDestinationCheckConnectionJob() throws IOException { @Test void testCreateDiscoverSchemaJob() throws IOException { - - final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withDiscoverCatalogId("foo"); + final UUID expectedCatalogId = UUID.randomUUID(); + final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withDiscoverCatalogId(expectedCatalogId); when(temporalClient.submitDiscoverSchema(any(UUID.class), eq(0), any(JobDiscoverCatalogConfig.class))) .thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true))); - final SynchronousResponse response = schedulerClient.createDiscoverSchemaJob(SOURCE_CONNECTION, DOCKER_IMAGE, DOCKER_IMAGE_TAG); - assertEquals("foo", response.getOutput()); + final SynchronousResponse response = schedulerClient.createDiscoverSchemaJob(SOURCE_CONNECTION, DOCKER_IMAGE, DOCKER_IMAGE_TAG); + assertEquals(expectedCatalogId, response.getOutput()); } @Test diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java index 25f77f572b3a6..a8d633c7271fe 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java @@ -93,7 +93,7 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI configRepository.writeActorCatalogFetchEvent(catalog.get(), UUID.fromString(discoverSchemaInput.getSourceId()), discoverSchemaInput.getConnectorVersion(), discoverSchemaInput.getConfigHash()); - return new ConnectorJobOutput().withOutputType(OutputType.DISCOVER_CATALOG_ID).withDiscoverCatalogId(catalogId.toString()); + return new ConnectorJobOutput().withOutputType(OutputType.DISCOVER_CATALOG_ID).withDiscoverCatalogId(catalogId); } else { return WorkerUtils.getJobFailureOutputOrThrow( OutputType.DISCOVER_CATALOG_ID, diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java index 1eb445810970e..32c5decce568c 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java @@ -113,7 +113,7 @@ void testDiscoverSchema() throws Exception { assertNull(output.getFailureReason()); assertEquals(OutputType.DISCOVER_CATALOG_ID, output.getOutputType()); - assertEquals(CATALOG_ID.toString(), output.getDiscoverCatalogId()); + assertEquals(CATALOG_ID, output.getDiscoverCatalogId()); verify(mConfigRepository).writeActorCatalogFetchEvent(eq(CATALOG), eq(SOURCE_ID), any(), any()); Assertions.assertTimeout(Duration.ofSeconds(5), () -> { diff --git a/tools/bin/make-big-schema.sh b/tools/bin/make-big-schema.sh index ea26b032e3eb2..68919164ad50b 100755 --- a/tools/bin/make-big-schema.sh +++ b/tools/bin/make-big-schema.sh @@ -7,4 +7,4 @@ do docker exec -it airbyte-source psql -U postgres -c "INSERT INTO public.users$i(col1) VALUES('record1');"; docker exec -it airbyte-source psql -U postgres -c "INSERT INTO public.users$i(col1) VALUES('record2');"; docker exec -it airbyte-source psql -U postgres -c "INSERT INTO public.users$i(col1) VALUES('record3');"; -done \ No newline at end of file +done From b5b988679feadc90c2e504b7dded36fa0fcb27ef Mon Sep 17 00:00:00 2001 From: Michael Siega Date: Mon, 3 Oct 2022 22:11:17 +0200 Subject: [PATCH 4/5] update internal ScheduleHandler method name --- .../main/java/io/airbyte/server/handlers/SchedulerHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 9db712ff35ed1..179609466aacc 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -252,7 +252,7 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceCreate(final So .withSourceDefinitionId(sourceCreate.getSourceDefinitionId()) .withConfiguration(partialConfig); final SynchronousResponse response = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName, sourceDef.getDockerImageTag()); - return discoverJobToOutput(response); + return retrieveDiscoveredSchema(response); } private SourceDiscoverSchemaRead retrieveDiscoveredSchema(final SynchronousResponse response) throws ConfigNotFoundException, IOException { From f724ae4ab6db0fe9e480715aba0fcf5e69b126bf Mon Sep 17 00:00:00 2001 From: Michael Siega Date: Wed, 5 Oct 2022 18:39:19 +0200 Subject: [PATCH 5/5] update source tests per new schema discovery interface --- .../standardtest/source/AbstractSourceConnectorTest.java | 5 +++-- .../standardtest/source/AbstractSourceDatabaseTypeTest.java | 3 ++- .../standardtest/source/SourceAcceptanceTest.java | 3 ++- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java index 4a4d60b937711..488bc46deb375 100644 --- a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java +++ b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java @@ -44,6 +44,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.mockito.ArgumentCaptor; @@ -157,8 +158,8 @@ protected String runCheckAndGetStatusAsString(final JsonNode config) throws Exce .run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot).getCheckConnection().getStatus().toString(); } - protected String runDiscover() throws Exception { - final String toReturn = new DefaultDiscoverCatalogWorker( + protected UUID runDiscover() throws Exception { + final UUID toReturn = new DefaultDiscoverCatalogWorker( mConfigRepository, new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements())) .run(new StandardDiscoverCatalogInput().withConnectionConfiguration(getConfig()), jobRoot).getDiscoverCatalogId(); diff --git a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.java b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.java index 63d7dc5dd9e8b..fcc0f79534828 100644 --- a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.java +++ b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.stream.Collectors; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -108,7 +109,7 @@ protected boolean testCatalog() { public void testDataTypes() throws Exception { final ConfiguredAirbyteCatalog catalog = getConfiguredCatalog(); final List allMessages = runRead(catalog); - final String catalogId = runDiscover(); + final UUID catalogId = runDiscover(); final Map streams = getLastPersistedCatalog().getStreams().stream() .collect(Collectors.toMap(AirbyteStream::getName, s -> s)); final List recordMessages = allMessages.stream().filter(m -> m.getType() == Type.RECORD).toList(); diff --git a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceAcceptanceTest.java b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceAcceptanceTest.java index 84f9b2513859f..97df8de9de00d 100644 --- a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceAcceptanceTest.java +++ b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceAcceptanceTest.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.List; import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -153,7 +154,7 @@ public void testCheckConnection() throws Exception { */ @Test public void testDiscover() throws Exception { - final String discoverOutput = runDiscover(); + final UUID discoverOutput = runDiscover(); final AirbyteCatalog discoveredCatalog = getLastPersistedCatalog(); assertNotNull(discoveredCatalog, "Expected discover to produce a catalog"); verifyCatalog(discoveredCatalog);