Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support large schema discovery #17394

Merged
merged 7 commits into from
Oct 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ properties:
type: string
enum:
- checkConnection
- discoverCatalog
- discoverCatalogId
- spec
checkConnection:
"$ref": StandardCheckConnectionOutput.yaml
discoverCatalog:
existingJavaType: io.airbyte.protocol.models.AirbyteCatalog
discoverCatalogId:
description: A UUID for the discovered catalog which is persisted by the job
type: string
format: uuid
spec:
existingJavaType: io.airbyte.protocol.models.ConnectorSpecification
failureReason:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,25 @@ additionalProperties: false
required:
- connectionConfiguration
- dockerImage
- sourceId
- connectorVersion
- configHash
properties:
connectionConfiguration:
description: Integration specific blob. Must be a valid JSON string.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
dockerImage:
type: string
description: The connector image
example: airbyte/source-postgres:1.0.12
sourceId:
description: The ID of the source being discovered, so we can persist this alongside the discovered catalog
type: string
connectorVersion:
description: Connector version, so we can persist this alongside the discovered catalog
type: string
example: 1.0.12
configHash:
description: Hash of the source configuration -- see `configuration` field in SourceConnection.yaml -- so we can persist this alongside the discovered catalog.
type: string
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,21 @@ description: information required for connection.
type: object
required:
- connectionConfiguration
- sourceId
- connectorVersion
- configHash
additionalProperties: false
properties:
connectionConfiguration:
description: Integration specific blob. Must be a valid JSON string.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
sourceId:
description: The ID of the source being discovered, so we can persist the result
type: string
connectorVersion:
description: Connector version, so we can persist the result
type: string
configHash:
description: Config hash, so we can persist the result
type: string
2 changes: 2 additions & 0 deletions airbyte-integrations/bases/standard-source-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import org.jsoup.Jsoup;
dependencies {
implementation project(':airbyte-db:db-lib')
implementation project(':airbyte-config:config-models')
implementation project(':airbyte-config:config-persistence')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-workers')
implementation 'org.mockito:mockito-core:4.6.1'

implementation 'net.sourceforge.argparse4j:argparse4j:0.8.1'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.config.EnvConfigs;
Expand All @@ -16,6 +19,7 @@
import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.config.State;
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
Expand All @@ -40,8 +44,10 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.mockito.ArgumentCaptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -101,6 +107,14 @@ public abstract class AbstractSourceConnectorTest {

private WorkerConfigs workerConfigs;

private ConfigRepository mConfigRepository;

private final ArgumentCaptor<AirbyteCatalog> lastPersistedCatalog = ArgumentCaptor.forClass(AirbyteCatalog.class);

protected AirbyteCatalog getLastPersistedCatalog() {
return lastPersistedCatalog.getValue();
}

@BeforeEach
public void setUpInternal() throws Exception {
final Path testDir = Path.of("/tmp/airbyte_tests/");
Expand All @@ -110,6 +124,7 @@ public void setUpInternal() throws Exception {
localRoot = Files.createTempDirectory(testDir, "output");
environment = new TestDestinationEnv(localRoot);
workerConfigs = new WorkerConfigs(new EnvConfigs());
mConfigRepository = mock(ConfigRepository.class);
processFactory = new DockerProcessFactory(
workerConfigs,
workspaceRoot,
Expand Down Expand Up @@ -143,10 +158,13 @@ protected String runCheckAndGetStatusAsString(final JsonNode config) throws Exce
.run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot).getCheckConnection().getStatus().toString();
}

protected AirbyteCatalog runDiscover() throws Exception {
return new DefaultDiscoverCatalogWorker(
protected UUID runDiscover() throws Exception {
final UUID toReturn = new DefaultDiscoverCatalogWorker(
mConfigRepository,
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements()))
.run(new StandardDiscoverCatalogInput().withConnectionConfiguration(getConfig()), jobRoot).getDiscoverCatalog();
.run(new StandardDiscoverCatalogInput().withConnectionConfiguration(getConfig()), jobRoot).getDiscoverCatalogId();
verify(mConfigRepository).writeActorCatalogFetchEvent(lastPersistedCatalog.capture(), any(), any(), any());
return toReturn;
}

protected void checkEntrypointEnvVariable() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
Expand Down Expand Up @@ -108,7 +109,8 @@ protected boolean testCatalog() {
public void testDataTypes() throws Exception {
final ConfiguredAirbyteCatalog catalog = getConfiguredCatalog();
final List<AirbyteMessage> allMessages = runRead(catalog);
final Map<String, AirbyteStream> streams = runDiscover().getStreams().stream()
final UUID catalogId = runDiscover();
final Map<String, AirbyteStream> streams = getLastPersistedCatalog().getStreams().stream()
.collect(Collectors.toMap(AirbyteStream::getName, s -> s));
final List<AirbyteMessage> recordMessages = allMessages.stream().filter(m -> m.getType() == Type.RECORD).toList();
final Map<String, List<String>> expectedValues = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
Expand Down Expand Up @@ -153,9 +154,10 @@ public void testCheckConnection() throws Exception {
*/
@Test
public void testDiscover() throws Exception {
final AirbyteCatalog discoverOutput = runDiscover();
assertNotNull(discoverOutput, "Expected discover to produce a catalog");
verifyCatalog(discoverOutput);
final UUID discoverOutput = runDiscover();
final AirbyteCatalog discoveredCatalog = getLastPersistedCatalog();
assertNotNull(discoveredCatalog, "Expected discover to produce a catalog");
verifyCatalog(discoveredCatalog);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,8 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source
configRepository.getActorCatalog(discoverSchemaRequestBody.getSourceId(), connectorVersion, configHash);
final boolean bustActorCatalogCache = discoverSchemaRequestBody.getDisableCache() != null && discoverSchemaRequestBody.getDisableCache();
if (currentCatalog.isEmpty() || bustActorCatalogCache) {
final SynchronousResponse<AirbyteCatalog> response = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName);
final SourceDiscoverSchemaRead returnValue = discoverJobToOutput(response);
if (response.isSuccess()) {
final UUID catalogId =
configRepository.writeActorCatalogFetchEvent(response.getOutput(), source.getSourceId(), connectorVersion, configHash);
returnValue.catalogId(catalogId);
}
return returnValue;
final SynchronousResponse<UUID> persistedCatalogId = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName, connectorVersion);
return retrieveDiscoveredSchema(persistedCatalogId);
}
final AirbyteCatalog airbyteCatalog = Jsons.object(currentCatalog.get().getCatalog(), AirbyteCatalog.class);
final SynchronousJobRead emptyJob = new SynchronousJobRead()
Expand Down Expand Up @@ -257,16 +251,19 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceCreate(final So
final SourceConnection source = new SourceConnection()
.withSourceDefinitionId(sourceCreate.getSourceDefinitionId())
.withConfiguration(partialConfig);
final SynchronousResponse<AirbyteCatalog> response = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName);
return discoverJobToOutput(response);
final SynchronousResponse<UUID> response = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName, sourceDef.getDockerImageTag());
return retrieveDiscoveredSchema(response);
}

private SourceDiscoverSchemaRead discoverJobToOutput(final SynchronousResponse<AirbyteCatalog> response) {
private SourceDiscoverSchemaRead retrieveDiscoveredSchema(final SynchronousResponse<UUID> response) throws ConfigNotFoundException, IOException {
final SourceDiscoverSchemaRead sourceDiscoverSchemaRead = new SourceDiscoverSchemaRead()
.jobInfo(jobConverter.getSynchronousJobRead(response));

if (response.isSuccess()) {
sourceDiscoverSchemaRead.catalog(CatalogConverter.toApi(response.getOutput()));
ActorCatalog catalog = configRepository.getActorCatalogById(response.getOutput());
final AirbyteCatalog persistenceCatalog = Jsons.object(catalog.getCatalog(),
io.airbyte.protocol.models.AirbyteCatalog.class);
sourceDiscoverSchemaRead.catalog(CatalogConverter.toApi(persistenceCatalog));
}

return sourceDiscoverSchemaRead;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.DestinationConnection;
Expand All @@ -20,7 +24,6 @@
import io.airbyte.persistence.job.factory.OAuthConfigSupplier;
import io.airbyte.persistence.job.tracker.JobTracker;
import io.airbyte.persistence.job.tracker.JobTracker.JobState;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalResponse;
Expand All @@ -38,6 +41,8 @@ public class DefaultSynchronousSchedulerClient implements SynchronousSchedulerCl

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSynchronousSchedulerClient.class);

private static final HashFunction HASH_FUNCTION = Hashing.md5();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: there are now multiple instances of this floating about. We should centralise this. This can happen in a follow up PR. It definitely needs to happen otherwise we can bump heads against a tricky cache invalidation issues.


private final TemporalClient temporalClient;
private final JobTracker jobTracker;
private final JobErrorReporter jobErrorReporter;
Expand Down Expand Up @@ -101,15 +106,19 @@ public SynchronousResponse<StandardCheckConnectionOutput> createDestinationCheck
}

@Override
public SynchronousResponse<AirbyteCatalog> createDiscoverSchemaJob(final SourceConnection source, final String dockerImage)
public SynchronousResponse<UUID> createDiscoverSchemaJob(final SourceConnection source, final String dockerImage, final String connectorVersion)
throws IOException {
final JsonNode sourceConfiguration = oAuthConfigSupplier.injectSourceOAuthParameters(
source.getSourceDefinitionId(),
source.getWorkspaceId(),
source.getConfiguration());
final JobDiscoverCatalogConfig jobDiscoverCatalogConfig = new JobDiscoverCatalogConfig()
.withConnectionConfiguration(sourceConfiguration)
.withDockerImage(dockerImage);
.withDockerImage(dockerImage)
.withSourceId(source.getSourceId().toString())
.withConfigHash(HASH_FUNCTION.hashBytes(Jsons.serialize(source.getConfiguration()).getBytes(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like the kind of thing that should maybe go into a helper or util class, and we can add some javadoc to explain why and how we're hashing the config. Otherwise I'd worry that this starts getting copy/pasted elsewhere and becomes hard to change if we ever need to

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is a good call - Davin opened https://github.com/airbytehq/airbyte/issues/17488.

Charsets.UTF_8)).toString())
.withConnectorVersion(connectorVersion);

final UUID jobId = UUID.randomUUID();
final ConnectorJobReportingContext jobReportingContext = new ConnectorJobReportingContext(jobId, dockerImage);
Expand All @@ -119,7 +128,7 @@ public SynchronousResponse<AirbyteCatalog> createDiscoverSchemaJob(final SourceC
jobReportingContext,
source.getSourceDefinitionId(),
() -> temporalClient.submitDiscoverSchema(jobId, 0, jobDiscoverCatalogConfig),
ConnectorJobOutput::getDiscoverCatalog,
ConnectorJobOutput::getDiscoverCatalogId,
source.getWorkspaceId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.io.IOException;
import java.util.UUID;

/**
* Exposes a way of executing short-lived jobs as RPC calls. Blocks until the job completes. No
Expand All @@ -23,7 +23,7 @@ SynchronousResponse<StandardCheckConnectionOutput> createSourceCheckConnectionJo
SynchronousResponse<StandardCheckConnectionOutput> createDestinationCheckConnectionJob(DestinationConnection destination, String dockerImage)
throws IOException;

SynchronousResponse<AirbyteCatalog> createDiscoverSchemaJob(SourceConnection source, String dockerImage) throws IOException;
SynchronousResponse<UUID> createDiscoverSchemaJob(SourceConnection source, String dockerImage, String connectorVersion) throws IOException;

SynchronousResponse<ConnectorSpecification> createGetSpecJob(String dockerImage) throws IOException;

Expand Down
Loading