Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix record count and add acceptance test to the new scheduler #9487

Merged
merged 4 commits into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,7 @@ MAX_SYNC_WORKERS=5
MAX_SPEC_WORKERS=5
MAX_CHECK_WORKERS=5
MAX_DISCOVER_WORKERS=5


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
### FEATURE FLAGS ###

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

### FEATURE FLAGS ###
NEW_SCHEDULER=false
3 changes: 3 additions & 0 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ jobs:
- name: Run End-to-End Acceptance Tests
run: ./tools/bin/acceptance_test.sh

- name: Run End-to-End Acceptance Tests with the new scheduler
run: ./tools/bin/acceptance_test_with_new_scheduler.sh

- name: Automatic Migration Acceptance Test
run: SUB_BUILD=PLATFORM ./gradlew :airbyte-tests:automaticMigrationAcceptanceTest --scan -i

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public class EnvVariableFeatureFlags implements FeatureFlags {

@Override
public boolean usesNewScheduler() {
return System.getenv().containsKey("NEW_SCHEDULER");
return Boolean.parseBoolean(System.getenv("NEW_SCHEDULER"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,10 @@ public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequ

@Override
public JobInfoRead resetConnection(final ConnectionIdRequestBody connectionIdRequestBody) {
if (featureFlags.usesNewScheduler()) {
return execute(() -> schedulerHandler.resetConnection(connectionIdRequestBody.getConnectionId()));
}

return execute(() -> schedulerHandler.resetConnection(connectionIdRequestBody));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate)

if (featureFlags.usesNewScheduler()) {
try {
LOGGER.info("Starting a connection using the new scheduler");
temporalWorkerRunFactory.createNewSchedulerWorkflow(connectionId);
} catch (final Exception e) {
LOGGER.error("Start of the temporal connection manager workflow failed", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,18 @@ public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequ
return jobConverter.getJobInfoRead(job);
}

public JobInfoRead resetConnection(final UUID connectionId) throws IOException {
final ManualSyncSubmissionResult manualSyncSubmissionResult = temporalWorkerRunFactory.resetConnection(connectionId);

if (manualSyncSubmissionResult.getFailingReason().isPresent()) {
throw new IllegalStateException(manualSyncSubmissionResult.getFailingReason().get());
}

final Job job = jobPersistence.getJob(manualSyncSubmissionResult.getJobId().get());

return jobConverter.getJobInfoRead(job);
}

public JobInfoRead resetConnection(final ConnectionIdRequestBody connectionIdRequestBody)
throws IOException, JsonValidationException, ConfigNotFoundException {
final UUID connectionId = connectionIdRequestBody.getConnectionId();
Expand Down Expand Up @@ -456,13 +468,14 @@ public JobInfoRead createManualRun(final UUID connectionId) throws IOException {
public JobInfoRead createNewSchedulerCancellation(final Long id) throws IOException {
final Job job = jobPersistence.getJob(id);

final ManualSyncSubmissionResult manualSyncSubmissionResult = temporalWorkerRunFactory.startNewCancelation(UUID.fromString(job.getScope()));
final ManualSyncSubmissionResult cancellationSubmissionResult = temporalWorkerRunFactory.startNewCancelation(UUID.fromString(job.getScope()));

if (manualSyncSubmissionResult.getFailingReason().isPresent()) {
throw new IllegalStateException(manualSyncSubmissionResult.getFailingReason().get());
if (cancellationSubmissionResult.getFailingReason().isPresent()) {
throw new IllegalStateException(cancellationSubmissionResult.getFailingReason().get());
}

return jobConverter.getJobInfoRead(job);
final Job cancelledJob = jobPersistence.getJob(id);
return jobConverter.getJobInfoRead(cancelledJob);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ public void setup() throws IOException, JsonValidationException, ConfigNotFoundE
.memoryRequest(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getMemoryRequest())
.memoryLimit(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getMemoryLimit()));

when(schedulerHandler.resetConnection(any())).thenReturn(new JobInfoRead().job(new JobRead().status(JobStatus.SUCCEEDED)));
when(schedulerHandler.resetConnection(any(ConnectionIdRequestBody.class)))
.thenReturn(new JobInfoRead().job(new JobRead().status(JobStatus.SUCCEEDED)));
}

@Test
Expand Down
3 changes: 3 additions & 0 deletions airbyte-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@ dependencies {

acceptanceTestsImplementation project(':airbyte-api')
acceptanceTestsImplementation project(':airbyte-commons')
acceptanceTestsImplementation project(':airbyte-config:models')
acceptanceTestsImplementation project(':airbyte-config:persistence')
acceptanceTestsImplementation project(':airbyte-db:lib')
acceptanceTestsImplementation project(':airbyte-tests')
acceptanceTestsImplementation project(':airbyte-test-utils')
acceptanceTestsImplementation project(':airbyte-workers')

acceptanceTestsImplementation 'com.fasterxml.jackson.core:jackson-databind'
acceptanceTestsImplementation 'io.github.cdimascio:java-dotenv:3.0.0'
acceptanceTestsImplementation 'io.temporal:temporal-sdk:1.6.0'
acceptanceTestsImplementation 'org.apache.commons:commons-csv:1.4'
acceptanceTestsImplementation 'org.testcontainers:postgresql:1.15.3'
acceptanceTestsImplementation 'org.postgresql:postgresql:42.2.18'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import io.airbyte.api.client.model.SourceIdRequestBody;
import io.airbyte.api.client.model.SourceRead;
import io.airbyte.api.client.model.SyncMode;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.MoreBooleans;
import io.airbyte.commons.resources.MoreResources;
Expand Down Expand Up @@ -171,6 +173,8 @@ public class AcceptanceTests {
private List<UUID> destinationIds;
private List<UUID> operationIds;

private static FeatureFlags featureFlags;

@SuppressWarnings("UnstableApiUsage")
@BeforeAll
public static void init() throws URISyntaxException, IOException, InterruptedException {
Expand Down Expand Up @@ -203,6 +207,8 @@ public static void init() throws URISyntaxException, IOException, InterruptedExc
} else {
LOGGER.info("Using external deployment of airbyte.");
}

featureFlags = new EnvVariableFeatureFlags();
}

@AfterAll
Expand Down Expand Up @@ -467,7 +473,10 @@ public void testManualSync() throws Exception {
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());
assertSourceAndDestinationDbInSync(false);
Expand All @@ -486,7 +495,10 @@ public void testCancelSync() throws Exception {
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForJob(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.PENDING));

Expand Down Expand Up @@ -519,7 +531,10 @@ public void testIncrementalSync() throws Exception {
.destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
LOGGER.info("Beginning testIncrementalSync() sync 1");
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
Expand Down Expand Up @@ -550,21 +565,31 @@ public void testIncrementalSync() throws Exception {
assertRawDestinationContains(expectedRecords, new SchemaTableNamePair("public", STREAM_NAME));

// reset back to no data.

LOGGER.info("Starting testIncrementalSync() reset");
final JobInfoRead jobInfoRead = apiClient.getConnectionApi().resetConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), jobInfoRead.getJob());
FeatureFlags featureFlags = new EnvVariableFeatureFlags();
if (featureFlags.usesNewScheduler()) {
waitForJob(apiClient.getJobsApi(), jobInfoRead.getJob(),
Sets.newHashSet(JobStatus.PENDING, JobStatus.RUNNING, JobStatus.INCOMPLETE, JobStatus.FAILED));
} else {
waitForSuccessfulJob(apiClient.getJobsApi(), jobInfoRead.getJob());
}

LOGGER.info("state after reset: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

assertRawDestinationContains(Collections.emptyList(), new SchemaTableNamePair("public", STREAM_NAME));
assertRawDestinationContains(Collections.emptyList(), new SchemaTableNamePair("public",
STREAM_NAME));

// sync one more time. verify it is the equivalent of a full refresh.
LOGGER.info("Starting testIncrementalSync() sync 3");
final JobInfoRead connectionSyncRead3 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
final JobInfoRead connectionSyncRead3 =
apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob());
LOGGER.info("state after sync 3: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

assertSourceAndDestinationDbInSync(false);

}

@Test
Expand Down Expand Up @@ -613,7 +638,10 @@ public void testMultipleSchemasAndTablesSync() throws Exception {
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());
assertSourceAndDestinationDbInSync(false);
Expand Down Expand Up @@ -743,7 +771,10 @@ public void testCheckpointing() throws Exception {
.destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null).getConnectionId();

// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));

Expand Down Expand Up @@ -834,7 +865,10 @@ public void testBackpressure() throws Exception {
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null)
.getConnectionId();

// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));

Expand Down Expand Up @@ -912,6 +946,10 @@ public void testFailureTimeout() throws Exception {
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null)
.getConnectionId();
// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}

final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
Expand Down Expand Up @@ -1315,4 +1353,17 @@ public enum Type {
DESTINATION
}

private static void waitForTemporalWorkflow(final UUID connectionId) {
/*
* do { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e);
* } } while
* (temporalClient.isWorkflowRunning(temporalClient.getConnectionManagerName(connectionId)));
*/
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

}
1 change: 1 addition & 0 deletions airbyte-workers/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {

implementation project(':airbyte-analytics')
implementation project(':airbyte-api')
implementation project(':airbyte-commons-docker')
implementation project(':airbyte-config:models')
implementation project(':airbyte-config:persistence')
implementation project(':airbyte-db:lib')
Expand Down
32 changes: 30 additions & 2 deletions airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers;

import io.airbyte.analytics.Deployment;
import io.airbyte.analytics.TrackingClient;
import io.airbyte.analytics.TrackingClientSingleton;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
Expand All @@ -24,11 +25,14 @@
import io.airbyte.db.instance.jobs.JobsDatabaseInstance;
import io.airbyte.scheduler.persistence.DefaultJobCreator;
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
import io.airbyte.scheduler.persistence.JobCreator;
import io.airbyte.scheduler.persistence.JobNotifier;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.WorkspaceHelper;
import io.airbyte.scheduler.persistence.job_factory.DefaultSyncJobFactory;
import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier;
import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.workers.helper.ConnectionHelper;
import io.airbyte.workers.process.DockerProcessFactory;
import io.airbyte.workers.process.KubePortManagerSingleton;
Expand Down Expand Up @@ -100,6 +104,8 @@ public class WorkerApp {
private final Configs configs;
private final ConnectionHelper connectionHelper;
private final boolean containerOrchestratorEnabled;
private final JobNotifier jobNotifier;
private final JobTracker jobTracker;

public void start() {
final Map<String, String> mdc = MDC.getCopyOfContextMap();
Expand Down Expand Up @@ -187,6 +193,8 @@ public void start() {

syncWorker.registerActivitiesImplementations(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity);

final JobCreator jobCreator = new DefaultJobCreator(jobPersistence, configRepository);

final Worker connectionUpdaterWorker =
factory.newWorker(TemporalJobType.CONNECTION_UPDATER.toString(), getWorkerOptions(maxWorkers.getMaxSyncWorkers()));
connectionUpdaterWorker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, SyncWorkflowImpl.class);
Expand All @@ -198,7 +206,11 @@ public void start() {
jobPersistence,
temporalWorkerRunFactory,
workerEnvironment,
logConfigs),
logConfigs,
jobNotifier,
jobTracker,
configRepository,
jobCreator),
new ConfigFetchActivityImpl(configRepository, jobPersistence, configs, () -> Instant.now().getEpochSecond()),
new ConnectionDeletionActivityImpl(connectionHelper),
replicationActivity,
Expand Down Expand Up @@ -345,6 +357,12 @@ public static void main(final String[] args) throws IOException, InterruptedExce
.getInitialized();

final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase);
TrackingClientSingleton.initialize(
configs.getTrackingStrategy(),
new Deployment(configs.getDeploymentMode(), jobPersistence.getDeployment().orElseThrow(), configs.getWorkerEnvironment()),
configs.getAirbyteRole(),
configs.getAirbyteVersion(),
configRepository);
final TrackingClient trackingClient = TrackingClientSingleton.get();
final SyncJobFactory jobFactory = new DefaultSyncJobFactory(
new DefaultJobCreator(jobPersistence, configRepository),
Expand Down Expand Up @@ -372,6 +390,14 @@ public static void main(final String[] args) throws IOException, InterruptedExce
workspaceHelper,
workerConfigs);

final JobNotifier jobNotifier = new JobNotifier(
configs.getWebappUrl(),
configRepository,
workspaceHelper,
TrackingClientSingleton.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing this be initialised? Am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is initialized in the get method

public static TrackingClient get() {
    synchronized (lock) {
      if (trackingClient == null) {
        initialize();
      }
      return trackingClient;
    }
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I was referring to properly initialising it with this call https://github.com/airbytehq/airbyte/blob/master/airbyte-analytics/src/main/java/io/airbyte/analytics/TrackingClientSingleton.java#L39. e.g. https://github.com/airbytehq/airbyte/blob/master/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java#L259

I think this is needed otherwise we won't be sending events to Segment. If you look at the Scheduler code, the linked code is called before we call get. I'm assuming we want to do the same here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching it, Done


final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence, trackingClient);

new WorkerApp(
workspaceRoot,
jobProcessFactory,
Expand All @@ -392,7 +418,9 @@ public static void main(final String[] args) throws IOException, InterruptedExce
temporalWorkerRunFactory,
configs,
connectionHelper,
configs.getContainerOrchestratorEnabled()).start();
configs.getContainerOrchestratorEnabled(),
jobNotifier,
jobTracker).start();
}

}
Loading