diff --git a/.env b/.env index b21d58434f634..0f43c07e94ebf 100644 --- a/.env +++ b/.env @@ -88,3 +88,7 @@ MAX_SYNC_WORKERS=5 MAX_SPEC_WORKERS=5 MAX_CHECK_WORKERS=5 MAX_DISCOVER_WORKERS=5 + + +### FEATURE FLAGS ### +NEW_SCHEDULER=false diff --git a/.github/workflows/gradle.yml b/.github/workflows/gradle.yml index c5c53c331a1b9..533668cdf38a8 100644 --- a/.github/workflows/gradle.yml +++ b/.github/workflows/gradle.yml @@ -261,6 +261,9 @@ jobs: - name: Run End-to-End Acceptance Tests run: ./tools/bin/acceptance_test.sh + - name: Run End-to-End Acceptance Tests with the new scheduler + run: ./tools/bin/acceptance_test_with_new_scheduler.sh + - name: Automatic Migration Acceptance Test run: SUB_BUILD=PLATFORM ./gradlew :airbyte-tests:automaticMigrationAcceptanceTest --scan -i diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java index eb6047044a12e..b89ca268c9eba 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java @@ -8,7 +8,7 @@ public class EnvVariableFeatureFlags implements FeatureFlags { @Override public boolean usesNewScheduler() { - return System.getenv().containsKey("NEW_SCHEDULER"); + return Boolean.parseBoolean(System.getenv("NEW_SCHEDULER")); } } diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java index 05ec930a8ff70..c2280bfd7438e 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java @@ -558,6 +558,10 @@ public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequ @Override public JobInfoRead resetConnection(final ConnectionIdRequestBody connectionIdRequestBody) { + if (featureFlags.usesNewScheduler()) { + return execute(() -> schedulerHandler.resetConnection(connectionIdRequestBody.getConnectionId())); + } + return execute(() -> schedulerHandler.resetConnection(connectionIdRequestBody)); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java index 0f5ffa9fb66ba..755820d9df519 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java @@ -160,6 +160,7 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate) if (featureFlags.usesNewScheduler()) { try { + LOGGER.info("Starting a connection using the new scheduler"); temporalWorkerRunFactory.createNewSchedulerWorkflow(connectionId); } catch (final Exception e) { LOGGER.error("Start of the temporal connection manager workflow failed", e); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 3adf012c3e5bd..a56a2107f206f 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -347,6 +347,18 @@ public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequ return jobConverter.getJobInfoRead(job); } + public JobInfoRead resetConnection(final UUID connectionId) throws IOException { + final ManualSyncSubmissionResult manualSyncSubmissionResult = temporalWorkerRunFactory.resetConnection(connectionId); + + if (manualSyncSubmissionResult.getFailingReason().isPresent()) { + throw new IllegalStateException(manualSyncSubmissionResult.getFailingReason().get()); + } + + final Job job = jobPersistence.getJob(manualSyncSubmissionResult.getJobId().get()); + + return jobConverter.getJobInfoRead(job); + } + public JobInfoRead resetConnection(final ConnectionIdRequestBody connectionIdRequestBody) throws IOException, JsonValidationException, ConfigNotFoundException { final UUID connectionId = connectionIdRequestBody.getConnectionId(); @@ -456,13 +468,14 @@ public JobInfoRead createManualRun(final UUID connectionId) throws IOException { public JobInfoRead createNewSchedulerCancellation(final Long id) throws IOException { final Job job = jobPersistence.getJob(id); - final ManualSyncSubmissionResult manualSyncSubmissionResult = temporalWorkerRunFactory.startNewCancelation(UUID.fromString(job.getScope())); + final ManualSyncSubmissionResult cancellationSubmissionResult = temporalWorkerRunFactory.startNewCancelation(UUID.fromString(job.getScope())); - if (manualSyncSubmissionResult.getFailingReason().isPresent()) { - throw new IllegalStateException(manualSyncSubmissionResult.getFailingReason().get()); + if (cancellationSubmissionResult.getFailingReason().isPresent()) { + throw new IllegalStateException(cancellationSubmissionResult.getFailingReason().get()); } - return jobConverter.getJobInfoRead(job); + final Job cancelledJob = jobPersistence.getJob(id); + return jobConverter.getJobInfoRead(cancelledJob); } } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index 3dcb73d2154e9..87f14345a3bc6 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -208,7 +208,8 @@ public void setup() throws IOException, JsonValidationException, ConfigNotFoundE .memoryRequest(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getMemoryRequest()) .memoryLimit(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getMemoryLimit())); - when(schedulerHandler.resetConnection(any())).thenReturn(new JobInfoRead().job(new JobRead().status(JobStatus.SUCCEEDED))); + when(schedulerHandler.resetConnection(any(ConnectionIdRequestBody.class))) + .thenReturn(new JobInfoRead().job(new JobRead().status(JobStatus.SUCCEEDED))); } @Test diff --git a/airbyte-tests/build.gradle b/airbyte-tests/build.gradle index 5312be73c4058..6f2a572c9bc1f 100644 --- a/airbyte-tests/build.gradle +++ b/airbyte-tests/build.gradle @@ -41,13 +41,16 @@ dependencies { acceptanceTestsImplementation project(':airbyte-api') acceptanceTestsImplementation project(':airbyte-commons') + acceptanceTestsImplementation project(':airbyte-config:models') acceptanceTestsImplementation project(':airbyte-config:persistence') acceptanceTestsImplementation project(':airbyte-db:lib') acceptanceTestsImplementation project(':airbyte-tests') acceptanceTestsImplementation project(':airbyte-test-utils') + acceptanceTestsImplementation project(':airbyte-workers') acceptanceTestsImplementation 'com.fasterxml.jackson.core:jackson-databind' acceptanceTestsImplementation 'io.github.cdimascio:java-dotenv:3.0.0' + acceptanceTestsImplementation 'io.temporal:temporal-sdk:1.6.0' acceptanceTestsImplementation 'org.apache.commons:commons-csv:1.4' acceptanceTestsImplementation 'org.testcontainers:postgresql:1.15.3' acceptanceTestsImplementation 'org.postgresql:postgresql:42.2.18' diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 4ac7009144a8e..e873d488e2082 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -68,6 +68,8 @@ import io.airbyte.api.client.model.SourceIdRequestBody; import io.airbyte.api.client.model.SourceRead; import io.airbyte.api.client.model.SyncMode; +import io.airbyte.commons.features.EnvVariableFeatureFlags; +import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.MoreBooleans; import io.airbyte.commons.resources.MoreResources; @@ -171,6 +173,8 @@ public class AcceptanceTests { private List destinationIds; private List operationIds; + private static FeatureFlags featureFlags; + @SuppressWarnings("UnstableApiUsage") @BeforeAll public static void init() throws URISyntaxException, IOException, InterruptedException { @@ -203,6 +207,8 @@ public static void init() throws URISyntaxException, IOException, InterruptedExc } else { LOGGER.info("Using external deployment of airbyte."); } + + featureFlags = new EnvVariableFeatureFlags(); } @AfterAll @@ -467,7 +473,10 @@ public void testManualSync() throws Exception { catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); final UUID connectionId = createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); - + // Avoid Race condition with the new scheduler + if (featureFlags.usesNewScheduler()) { + waitForTemporalWorkflow(connectionId); + } final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob()); assertSourceAndDestinationDbInSync(false); @@ -486,7 +495,10 @@ public void testCancelSync() throws Exception { catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); final UUID connectionId = createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); - + // Avoid Race condition with the new scheduler + if (featureFlags.usesNewScheduler()) { + waitForTemporalWorkflow(connectionId); + } final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForJob(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.PENDING)); @@ -519,7 +531,10 @@ public void testIncrementalSync() throws Exception { .destinationSyncMode(destinationSyncMode)); final UUID connectionId = createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); - + // Avoid Race condition with the new scheduler + if (featureFlags.usesNewScheduler()) { + waitForTemporalWorkflow(connectionId); + } LOGGER.info("Beginning testIncrementalSync() sync 1"); final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); @@ -550,21 +565,31 @@ public void testIncrementalSync() throws Exception { assertRawDestinationContains(expectedRecords, new SchemaTableNamePair("public", STREAM_NAME)); // reset back to no data. + LOGGER.info("Starting testIncrementalSync() reset"); final JobInfoRead jobInfoRead = apiClient.getConnectionApi().resetConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - waitForSuccessfulJob(apiClient.getJobsApi(), jobInfoRead.getJob()); + FeatureFlags featureFlags = new EnvVariableFeatureFlags(); + if (featureFlags.usesNewScheduler()) { + waitForJob(apiClient.getJobsApi(), jobInfoRead.getJob(), + Sets.newHashSet(JobStatus.PENDING, JobStatus.RUNNING, JobStatus.INCOMPLETE, JobStatus.FAILED)); + } else { + waitForSuccessfulJob(apiClient.getJobsApi(), jobInfoRead.getJob()); + } + LOGGER.info("state after reset: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); - assertRawDestinationContains(Collections.emptyList(), new SchemaTableNamePair("public", STREAM_NAME)); + assertRawDestinationContains(Collections.emptyList(), new SchemaTableNamePair("public", + STREAM_NAME)); // sync one more time. verify it is the equivalent of a full refresh. LOGGER.info("Starting testIncrementalSync() sync 3"); - final JobInfoRead connectionSyncRead3 = apiClient.getConnectionApi() - .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + final JobInfoRead connectionSyncRead3 = + apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob()); LOGGER.info("state after sync 3: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); assertSourceAndDestinationDbInSync(false); + } @Test @@ -613,7 +638,10 @@ public void testMultipleSchemasAndTablesSync() throws Exception { catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); final UUID connectionId = createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); - + // Avoid Race condition with the new scheduler + if (featureFlags.usesNewScheduler()) { + waitForTemporalWorkflow(connectionId); + } final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob()); assertSourceAndDestinationDbInSync(false); @@ -743,7 +771,10 @@ public void testCheckpointing() throws Exception { .destinationSyncMode(destinationSyncMode)); final UUID connectionId = createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null).getConnectionId(); - + // Avoid Race condition with the new scheduler + if (featureFlags.usesNewScheduler()) { + waitForTemporalWorkflow(connectionId); + } final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); @@ -834,7 +865,10 @@ public void testBackpressure() throws Exception { final UUID connectionId = createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null) .getConnectionId(); - + // Avoid Race condition with the new scheduler + if (featureFlags.usesNewScheduler()) { + waitForTemporalWorkflow(connectionId); + } final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); @@ -912,6 +946,10 @@ public void testFailureTimeout() throws Exception { final UUID connectionId = createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null) .getConnectionId(); + // Avoid Race condition with the new scheduler + if (featureFlags.usesNewScheduler()) { + waitForTemporalWorkflow(connectionId); + } final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); @@ -1315,4 +1353,17 @@ public enum Type { DESTINATION } + private static void waitForTemporalWorkflow(final UUID connectionId) { + /* + * do { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); + * } } while + * (temporalClient.isWorkflowRunning(temporalClient.getConnectionManagerName(connectionId))); + */ + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } diff --git a/airbyte-workers/build.gradle b/airbyte-workers/build.gradle index 42ac0bd19ee1e..1b097f48fe996 100644 --- a/airbyte-workers/build.gradle +++ b/airbyte-workers/build.gradle @@ -21,6 +21,7 @@ dependencies { implementation project(':airbyte-analytics') implementation project(':airbyte-api') + implementation project(':airbyte-commons-docker') implementation project(':airbyte-config:models') implementation project(':airbyte-config:persistence') implementation project(':airbyte-db:lib') diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index 5057e5463c051..5bf166662218f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -4,6 +4,7 @@ package io.airbyte.workers; +import io.airbyte.analytics.Deployment; import io.airbyte.analytics.TrackingClient; import io.airbyte.analytics.TrackingClientSingleton; import io.airbyte.commons.features.EnvVariableFeatureFlags; @@ -24,11 +25,14 @@ import io.airbyte.db.instance.jobs.JobsDatabaseInstance; import io.airbyte.scheduler.persistence.DefaultJobCreator; import io.airbyte.scheduler.persistence.DefaultJobPersistence; +import io.airbyte.scheduler.persistence.JobCreator; +import io.airbyte.scheduler.persistence.JobNotifier; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.scheduler.persistence.WorkspaceHelper; import io.airbyte.scheduler.persistence.job_factory.DefaultSyncJobFactory; import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier; import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory; +import io.airbyte.scheduler.persistence.job_tracker.JobTracker; import io.airbyte.workers.helper.ConnectionHelper; import io.airbyte.workers.process.DockerProcessFactory; import io.airbyte.workers.process.KubePortManagerSingleton; @@ -100,6 +104,8 @@ public class WorkerApp { private final Configs configs; private final ConnectionHelper connectionHelper; private final boolean containerOrchestratorEnabled; + private final JobNotifier jobNotifier; + private final JobTracker jobTracker; public void start() { final Map mdc = MDC.getCopyOfContextMap(); @@ -187,6 +193,8 @@ public void start() { syncWorker.registerActivitiesImplementations(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity); + final JobCreator jobCreator = new DefaultJobCreator(jobPersistence, configRepository); + final Worker connectionUpdaterWorker = factory.newWorker(TemporalJobType.CONNECTION_UPDATER.toString(), getWorkerOptions(maxWorkers.getMaxSyncWorkers())); connectionUpdaterWorker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, SyncWorkflowImpl.class); @@ -198,7 +206,11 @@ public void start() { jobPersistence, temporalWorkerRunFactory, workerEnvironment, - logConfigs), + logConfigs, + jobNotifier, + jobTracker, + configRepository, + jobCreator), new ConfigFetchActivityImpl(configRepository, jobPersistence, configs, () -> Instant.now().getEpochSecond()), new ConnectionDeletionActivityImpl(connectionHelper), replicationActivity, @@ -345,6 +357,12 @@ public static void main(final String[] args) throws IOException, InterruptedExce .getInitialized(); final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase); + TrackingClientSingleton.initialize( + configs.getTrackingStrategy(), + new Deployment(configs.getDeploymentMode(), jobPersistence.getDeployment().orElseThrow(), configs.getWorkerEnvironment()), + configs.getAirbyteRole(), + configs.getAirbyteVersion(), + configRepository); final TrackingClient trackingClient = TrackingClientSingleton.get(); final SyncJobFactory jobFactory = new DefaultSyncJobFactory( new DefaultJobCreator(jobPersistence, configRepository), @@ -372,6 +390,14 @@ public static void main(final String[] args) throws IOException, InterruptedExce workspaceHelper, workerConfigs); + final JobNotifier jobNotifier = new JobNotifier( + configs.getWebappUrl(), + configRepository, + workspaceHelper, + TrackingClientSingleton.get()); + + final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence, trackingClient); + new WorkerApp( workspaceRoot, jobProcessFactory, @@ -392,7 +418,9 @@ public static void main(final String[] args) throws IOException, InterruptedExce temporalWorkerRunFactory, configs, connectionHelper, - configs.getContainerOrchestratorEnabled()).start(); + configs.getContainerOrchestratorEnabled(), + jobNotifier, + jobTracker).start(); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java index d6d116a4eec99..f2ce3fd83e06b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java @@ -216,12 +216,10 @@ public void submitConnectionUpdaterAsync(final UUID connectionId) { final ConnectionManagerWorkflow connectionManagerWorkflow = getWorkflowOptionsWithWorkflowId(ConnectionManagerWorkflow.class, TemporalJobType.CONNECTION_UPDATER, getConnectionManagerName(connectionId)); final BatchRequest signalRequest = client.newSignalWithStartRequest(); - final ConnectionUpdaterInput input = new ConnectionUpdaterInput(connectionId, null, null, false, 1, null); + final ConnectionUpdaterInput input = new ConnectionUpdaterInput(connectionId, null, null, false, 1, null, false); signalRequest.add(connectionManagerWorkflow::run, input); WorkflowClient.start(connectionManagerWorkflow::run, input); - - log.info("Scheduler temporal wf started"); } public void deleteConnection(final UUID connectionId) { @@ -300,6 +298,7 @@ public ManualSyncSubmissionResult startNewCancelation(final UUID connectionId) { final boolean isWorflowRunning = isWorkflowRunning(getConnectionManagerName(connectionId)); if (!isWorflowRunning) { + log.error("Can't cancel a non running workflow"); return new ManualSyncSubmissionResult( Optional.of("No scheduler workflow is running for: " + connectionId), Optional.empty()); @@ -320,7 +319,45 @@ public ManualSyncSubmissionResult startNewCancelation(final UUID connectionId) { } } while (connectionManagerWorkflow.getState().isRunning()); - log.info("end of manual schedule"); + log.info("end of manual cancellation"); + + final long jobId = connectionManagerWorkflow.getJobInformation().getJobId(); + + return new ManualSyncSubmissionResult( + Optional.empty(), + Optional.of(jobId)); + } + + public ManualSyncSubmissionResult resetConnection(final UUID connectionId) { + log.info("reset sync request"); + + final boolean isWorflowRunning = isWorkflowRunning(getConnectionManagerName(connectionId)); + + if (!isWorflowRunning) { + log.error("Can't reset a non running workflow"); + return new ManualSyncSubmissionResult( + Optional.of("No scheduler workflow is running for: " + connectionId), + Optional.empty()); + } + + final ConnectionManagerWorkflow connectionManagerWorkflow = + getExistingWorkflow(ConnectionManagerWorkflow.class, getConnectionManagerName(connectionId)); + + final long oldJobId = connectionManagerWorkflow.getJobInformation().getJobId(); + + connectionManagerWorkflow.resetConnection(); + + do { + try { + Thread.sleep(DELAY_BETWEEN_QUERY_MS); + } catch (final InterruptedException e) { + return new ManualSyncSubmissionResult( + Optional.of("Didn't manage to reset a sync for: " + connectionId), + Optional.empty()); + } + } while (connectionManagerWorkflow.getJobInformation().getJobId() == oldJobId); + + log.info("end of reset"); final long jobId = connectionManagerWorkflow.getJobInformation().getJobId(); @@ -405,7 +442,7 @@ public boolean isWorkflowRunning(final String workflowName) { } @VisibleForTesting - static String getConnectionManagerName(final UUID connectionId) { + public static String getConnectionManagerName(final UUID connectionId) { return "connection_manager_" + connectionId; } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflow.java index 41b8c4131b7a1..098e0c580e28e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflow.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflow.java @@ -52,6 +52,9 @@ public interface ConnectionManagerWorkflow { @SignalMethod void connectionUpdated(); + @SignalMethod + void resetConnection(); + /** * Return the current state of the workflow. */ diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index d65046e3b9f2a..058bb927188d1 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -4,6 +4,9 @@ package io.airbyte.workers.temporal.scheduling; +import io.airbyte.config.StandardSyncOutput; +import io.airbyte.config.StandardSyncSummary; +import io.airbyte.config.StandardSyncSummary.ReplicationStatus; import io.airbyte.workers.temporal.TemporalJobType; import io.airbyte.workers.temporal.exception.RetryableException; import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity; @@ -23,6 +26,7 @@ import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationOutput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobFailureInput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobSuccessInput; +import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.ReportJobStartInput; import io.airbyte.workers.temporal.scheduling.shared.ActivityConfiguration; import io.airbyte.workers.temporal.scheduling.state.WorkflowState; import io.airbyte.workers.temporal.scheduling.state.listener.NoopStateListener; @@ -49,6 +53,8 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow Optional maybeJobId = Optional.empty(); Optional maybeAttemptId = Optional.empty(); + Optional standardSyncOutput = Optional.empty(); + private final GenerateInputActivity getSyncInputActivity = Workflow.newActivityStub(GenerateInputActivity.class, ActivityConfiguration.OPTIONS); private final JobCreationAndStatusUpdateActivity jobCreationAndStatusUpdateActivity = Workflow.newActivityStub(JobCreationAndStatusUpdateActivity.class, ActivityConfiguration.OPTIONS); @@ -78,20 +84,26 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr // Job and attempt creation maybeJobId = Optional.ofNullable(connectionUpdaterInput.getJobId()).or(() -> { final JobCreationOutput jobCreationOutput = jobCreationAndStatusUpdateActivity.createNewJob(new JobCreationInput( - connectionUpdaterInput.getConnectionId())); + connectionUpdaterInput.getConnectionId(), workflowState.isResetConnection())); + connectionUpdaterInput.setJobId(jobCreationOutput.getJobId()); return Optional.ofNullable(jobCreationOutput.getJobId()); }); maybeAttemptId = Optional.ofNullable(connectionUpdaterInput.getAttemptId()).or(() -> maybeJobId.map(jobId -> { final AttemptCreationOutput attemptCreationOutput = jobCreationAndStatusUpdateActivity.createNewAttempt(new AttemptCreationInput( jobId)); + connectionUpdaterInput.setAttemptId(attemptCreationOutput.getAttemptId()); return attemptCreationOutput.getAttemptId(); })); // Sync workflow final SyncInput getSyncInputActivitySyncInput = new SyncInput( maybeAttemptId.get(), - maybeJobId.get()); + maybeJobId.get(), + workflowState.isResetConnection()); + + jobCreationAndStatusUpdateActivity.reportJobStart(new ReportJobStartInput( + maybeJobId.get())); final SyncOutput syncWorkflowInputs = getSyncInputActivity.getSyncWorkflowInput(getSyncInputActivitySyncInput); @@ -108,12 +120,18 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr final UUID connectionId = connectionUpdaterInput.getConnectionId(); try { - childSync.run( + standardSyncOutput = Optional.ofNullable(childSync.run( syncWorkflowInputs.getJobRunConfig(), syncWorkflowInputs.getSourceLauncherConfig(), syncWorkflowInputs.getDestinationLauncherConfig(), syncWorkflowInputs.getSyncInput(), - connectionId); + connectionId)); + + StandardSyncSummary standardSyncSummary = standardSyncOutput.get().getStandardSyncSummary(); + + if (standardSyncSummary != null && standardSyncSummary.getStatus() == ReplicationStatus.FAILED) { + workflowState.setFailed(true); + } } catch (final ChildWorkflowFailure childWorkflowFailure) { if (!(childWorkflowFailure.getCause() instanceof CanceledFailure)) { throw childWorkflowFailure; @@ -128,6 +146,16 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr // The naming is very misleading, it is not a failure but the expected behavior... } + if (workflowState.isResetConnection()) { + connectionUpdaterInput.setResetConnection(true); + connectionUpdaterInput.setJobId(null); + connectionUpdaterInput.setAttemptNumber(1); + connectionUpdaterInput.setFromFailure(false); + connectionUpdaterInput.setAttemptId(null); + } else { + connectionUpdaterInput.setResetConnection(false); + } + if (workflowState.isUpdated()) { log.error("A connection configuration has changed for the connection {}. The job will be restarted", connectionUpdaterInput.getConnectionId()); @@ -139,6 +167,8 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr } else if (workflowState.isCancelled()) { jobCreationAndStatusUpdateActivity.jobCancelled(new JobCancelledInput( maybeJobId.get())); + } else if (workflowState.isFailed()) { + reportFailure(connectionUpdaterInput); } else { // report success reportSuccess(connectionUpdaterInput); @@ -155,7 +185,8 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput) { jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput( maybeJobId.get(), - maybeAttemptId.get())); + maybeAttemptId.get(), + standardSyncOutput.orElse(null))); connectionUpdaterInput.setJobId(null); connectionUpdaterInput.setAttemptNumber(1); @@ -176,7 +207,8 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput) connectionUpdaterInput.setFromFailure(true); } else { jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput( - connectionUpdaterInput.getJobId())); + connectionUpdaterInput.getJobId(), + "Job failed after too many retries")); Workflow.await(Duration.ofMinutes(1), () -> skipScheduling()); @@ -217,6 +249,14 @@ public void connectionUpdated() { workflowState.setUpdated(true); } + @Override + public void resetConnection() { + if (!workflowState.isRunning()) { + cancelJob(); + } + workflowState.setResetConnection(true); + } + @Override public WorkflowState getState() { return workflowState; @@ -230,14 +270,15 @@ public JobInformation getJobInformation() { } private Boolean skipScheduling() { - return workflowState.isSkipScheduling() || workflowState.isDeleted() || workflowState.isUpdated(); + return workflowState.isSkipScheduling() || workflowState.isDeleted() || workflowState.isUpdated() || workflowState.isResetConnection(); } private void continueAsNew(final ConnectionUpdaterInput connectionUpdaterInput) { // Continue the workflow as new connectionUpdaterInput.setAttemptId(null); + boolean isDeleted = workflowState.isDeleted(); workflowState.reset(); - if (!workflowState.isDeleted()) { + if (!isDeleted) { Workflow.continueAsNew(connectionUpdaterInput); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionUpdaterInput.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionUpdaterInput.java index e556fbcff6d4f..b2c0dc701c749 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionUpdaterInput.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionUpdaterInput.java @@ -27,5 +27,6 @@ public class ConnectionUpdaterInput { private int attemptNumber; @Nullable private WorkflowState workflowState; + private boolean resetConnection; } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivity.java index c8b023dfffef2..75e5cf0d18b38 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivity.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivity.java @@ -23,6 +23,7 @@ class SyncInput { private int attemptId; private long jobId; + private boolean reset; } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java index 8c1fe1ecb67fd..6704b33810a37 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java @@ -4,11 +4,15 @@ package io.airbyte.workers.temporal.scheduling.activities; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.JobResetConnectionConfig; import io.airbyte.config.JobSyncConfig; import io.airbyte.config.StandardSyncInput; import io.airbyte.scheduler.models.IntegrationLauncherConfig; +import io.airbyte.scheduler.models.Job; import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.scheduler.persistence.JobPersistence; +import io.airbyte.workers.WorkerConstants; import io.airbyte.workers.temporal.TemporalUtils; import io.airbyte.workers.temporal.exception.RetryableException; import lombok.AllArgsConstructor; @@ -23,7 +27,22 @@ public SyncOutput getSyncWorkflowInput(final SyncInput input) { try { final long jobId = input.getJobId(); final int attempt = input.getAttemptId(); - final JobSyncConfig config = jobPersistence.getJob(jobId).getConfig().getSync(); + final Job job = jobPersistence.getJob(jobId); + JobSyncConfig config = job.getConfig().getSync(); + if (input.isReset()) { + final JobResetConnectionConfig resetConnection = job.getConfig().getResetConnection(); + config = new JobSyncConfig() + .withNamespaceDefinition(resetConnection.getNamespaceDefinition()) + .withNamespaceFormat(resetConnection.getNamespaceFormat()) + .withPrefix(resetConnection.getPrefix()) + .withSourceDockerImage(WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB) + .withDestinationDockerImage(resetConnection.getDestinationDockerImage()) + .withSourceConfiguration(Jsons.emptyObject()) + .withDestinationConfiguration(resetConnection.getDestinationConfiguration()) + .withConfiguredAirbyteCatalog(resetConnection.getConfiguredAirbyteCatalog()) + .withOperationSequence(resetConnection.getOperationSequence()) + .withResourceRequirements(resetConnection.getResourceRequirements()); + } final JobRunConfig jobRunConfig = TemporalUtils.createJobRunConfig(jobId, attempt); @@ -49,6 +68,7 @@ public SyncOutput getSyncWorkflowInput(final SyncInput input) { .withResourceRequirements(config.getResourceRequirements()); return new SyncOutput(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput); + } catch (final Exception e) { throw new RetryableException(e); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java index 44e70b17aba3d..385f952eb56a8 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java @@ -4,6 +4,7 @@ package io.airbyte.workers.temporal.scheduling.activities; +import io.airbyte.config.StandardSyncOutput; import io.airbyte.workers.temporal.exception.RetryableException; import io.temporal.activity.ActivityInterface; import io.temporal.activity.ActivityMethod; @@ -21,6 +22,7 @@ public interface JobCreationAndStatusUpdateActivity { class JobCreationInput { private UUID connectionId; + private boolean reset; } @@ -76,6 +78,7 @@ class JobSuccessInput { private long jobId; private int attemptId; + private StandardSyncOutput standardSyncOutput; } @@ -91,6 +94,7 @@ class JobSuccessInput { class JobFailureInput { private long jobId; + private String reason; } @@ -131,4 +135,16 @@ class JobCancelledInput { @ActivityMethod void jobCancelled(JobCancelledInput input); + @Data + @NoArgsConstructor + @AllArgsConstructor + class ReportJobStartInput { + + private long jobId; + + } + + @ActivityMethod + void reportJobStart(ReportJobStartInput reportJobStartInput); + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index f91fffdbdb675..bfd122a8d8089 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -4,17 +4,35 @@ package io.airbyte.workers.temporal.scheduling.activities; +import com.google.common.collect.Lists; +import io.airbyte.commons.docker.DockerUtils; +import io.airbyte.commons.enums.Enums; import io.airbyte.config.Configs.WorkerEnvironment; +import io.airbyte.config.DestinationConnection; +import io.airbyte.config.JobOutput; +import io.airbyte.config.StandardDestinationDefinition; +import io.airbyte.config.StandardSync; +import io.airbyte.config.StandardSyncOperation; import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.config.helpers.LogConfigs; +import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.scheduler.models.Job; +import io.airbyte.scheduler.persistence.JobCreator; +import io.airbyte.scheduler.persistence.JobNotifier; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory; +import io.airbyte.scheduler.persistence.job_tracker.JobTracker; +import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState; +import io.airbyte.validation.json.JsonValidationException; +import io.airbyte.workers.JobStatus; import io.airbyte.workers.temporal.exception.RetryableException; import io.airbyte.workers.worker_run.TemporalWorkerRunFactory; import io.airbyte.workers.worker_run.WorkerRun; import java.io.IOException; import java.nio.file.Path; +import java.util.List; +import java.util.Optional; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -27,14 +45,47 @@ public class JobCreationAndStatusUpdateActivityImpl implements JobCreationAndSta private final TemporalWorkerRunFactory temporalWorkerRunFactory; private final WorkerEnvironment workerEnvironment; private final LogConfigs logConfigs; + private final JobNotifier jobNotifier; + private final JobTracker jobTracker; + private final ConfigRepository configRepository; + private final JobCreator jobCreator; @Override public JobCreationOutput createNewJob(final JobCreationInput input) { - final long jobId = jobFactory.create(input.getConnectionId()); + try { + if (input.isReset()) { + final StandardSync standardSync = configRepository.getStandardSync(input.getConnectionId()); + + final DestinationConnection destination = configRepository.getDestinationConnection(standardSync.getDestinationId()); + + final StandardDestinationDefinition destinationDef = + configRepository.getStandardDestinationDefinition(destination.getDestinationDefinitionId()); + final String destinationImageName = DockerUtils.getTaggedImageName(destinationDef.getDockerRepository(), destinationDef.getDockerImageTag()); + + final List standardSyncOperations = Lists.newArrayList(); + for (final var operationId : standardSync.getOperationIds()) { + final StandardSyncOperation standardSyncOperation = configRepository.getStandardSyncOperation(operationId); + standardSyncOperations.add(standardSyncOperation); + } + + final Optional jobIdOptional = + jobCreator.createResetConnectionJob(destination, standardSync, destinationImageName, standardSyncOperations); - log.info("New job created, with id: " + jobId); + final long jobId = jobIdOptional.isEmpty() + ? jobPersistence.getLastReplicationJob(standardSync.getConnectionId()).orElseThrow(() -> new RuntimeException("No job available")).getId() + : jobIdOptional.get(); - return new JobCreationOutput(jobId); + return new JobCreationOutput(jobId); + } else { + final long jobId = jobFactory.create(input.getConnectionId()); + + log.info("New job created, with id: " + jobId); + + return new JobCreationOutput(jobId); + } + } catch (JsonValidationException | ConfigNotFoundException | IOException e) { + throw new RetryableException(e); + } } @Override @@ -57,7 +108,16 @@ public AttemptCreationOutput createNewAttempt(final AttemptCreationInput input) @Override public void jobSuccess(final JobSuccessInput input) { try { + if (input.getStandardSyncOutput() != null) { + final JobOutput jobOutput = new JobOutput().withSync(input.getStandardSyncOutput()); + jobPersistence.writeOutput(input.getJobId(), input.getAttemptId(), jobOutput); + } else { + log.warn("The job {} doesn't have an input for the attempt {}", input.getJobId(), input.getAttemptId()); + } jobPersistence.succeedAttempt(input.getJobId(), input.getAttemptId()); + final Job job = jobPersistence.getJob(input.getJobId()); + jobNotifier.successJob(job); + trackCompletion(job, JobStatus.SUCCEEDED); } catch (final IOException e) { throw new RetryableException(e); } @@ -67,6 +127,9 @@ public void jobSuccess(final JobSuccessInput input) { public void jobFailure(final JobFailureInput input) { try { jobPersistence.failJob(input.getJobId()); + final Job job = jobPersistence.getJob(input.getJobId()); + jobNotifier.failJob(input.getReason(), job); + trackCompletion(job, JobStatus.FAILED); } catch (final IOException e) { throw new RetryableException(e); } @@ -76,6 +139,7 @@ public void jobFailure(final JobFailureInput input) { public void attemptFailure(final AttemptFailureInput input) { try { jobPersistence.failAttempt(input.getJobId(), input.getAttemptId()); + final Job job = jobPersistence.getJob(input.getJobId()); } catch (final IOException e) { throw new RetryableException(e); } @@ -85,9 +149,26 @@ public void attemptFailure(final AttemptFailureInput input) { public void jobCancelled(final JobCancelledInput input) { try { jobPersistence.cancelJob(input.getJobId()); + final Job job = jobPersistence.getJob(input.getJobId()); + trackCompletion(job, JobStatus.FAILED); + jobNotifier.failJob("Job was cancelled", job); } catch (final IOException e) { throw new RetryableException(e); } } + @Override + public void reportJobStart(final ReportJobStartInput input) { + try { + final Job job = jobPersistence.getJob(input.getJobId()); + jobTracker.trackSync(job, JobState.STARTED); + } catch (final IOException e) { + throw new RetryableException(e); + } + } + + private void trackCompletion(final Job job, final io.airbyte.workers.JobStatus status) { + jobTracker.trackSync(job, Enums.convertTo(status, JobState.class)); + } + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java index c5b0242c62372..496c68b3f5c04 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java @@ -27,6 +27,8 @@ public WorkflowState(final UUID id, final WorkflowStateChangedListener stateChan private boolean skipScheduling = false; private boolean updated = false; private boolean cancelled = false; + private boolean failed = false; + private boolean resetConnection = false; public void setRunning(final boolean running) { final ChangedStateEvent event = new ChangedStateEvent( @@ -68,12 +70,30 @@ public void setCancelled(final boolean cancelled) { this.cancelled = cancelled; } + public void setFailed(final boolean failed) { + final ChangedStateEvent event = new ChangedStateEvent( + StateField.FAILED, + failed); + stateChangedListener.addEvent(id, event); + this.failed = failed; + } + + public void setResetConnection(final boolean resetConnection) { + final ChangedStateEvent event = new ChangedStateEvent( + StateField.RESET, + resetConnection); + stateChangedListener.addEvent(id, event); + this.resetConnection = resetConnection; + } + public void reset() { this.setRunning(false); this.setDeleted(false); this.setSkipScheduling(false); this.setUpdated(false); this.setCancelled(false); + this.setFailed(false); + this.setResetConnection(false); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java index 0e7301e975335..b84be34603ce9 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java @@ -27,7 +27,9 @@ enum StateField { DELETED, RUNNING, SKIPPED_SCHEDULING, - UPDATED + UPDATED, + FAILED, + RESET } @Value diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/worker_run/TemporalWorkerRunFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/worker_run/TemporalWorkerRunFactory.java index af68888144044..7fb2fbe3b49bb 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/worker_run/TemporalWorkerRunFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/worker_run/TemporalWorkerRunFactory.java @@ -59,6 +59,10 @@ public ManualSyncSubmissionResult startNewCancelation(final UUID connectionId) { return temporalClient.startNewCancelation(connectionId); } + public ManualSyncSubmissionResult resetConnection(final UUID connectionId) { + return temporalClient.resetConnection(connectionId); + } + public void deleteConnection(final UUID connectionId) { temporalClient.deleteConnection(connectionId); } @@ -72,12 +76,6 @@ public CheckedSupplier, Exception> createSupplier(fin final UUID connectionId = UUID.fromString(job.getScope()); return switch (job.getConfigType()) { case SYNC -> () -> { - - if (featureFlags.usesNewScheduler()) { - temporalClient.submitConnectionUpdaterAsync(connectionId); - - return toOutputAndStatusConnector(); - } final TemporalResponse output = temporalClient.submitSync(job.getId(), attemptId, job.getConfig().getSync(), connectionId); return toOutputAndStatus(output); @@ -96,7 +94,6 @@ public CheckedSupplier, Exception> createSupplier(fin .withOperationSequence(resetConnection.getOperationSequence()) .withResourceRequirements(resetConnection.getResourceRequirements()); - // TODO: Signal method? final TemporalResponse output = temporalClient.submitSync(job.getId(), attemptId, config, connectionId); return toOutputAndStatus(output); }; diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index 33a70b82e8e86..1266b48332c9d 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -124,7 +124,8 @@ public void runSuccess() { 1, false, 1, - workflowState); + workflowState, + false); WorkflowClient.start(workflow::run, input); testEnv.sleep(Duration.ofSeconds(124L)); @@ -156,7 +157,8 @@ public void retryAfterFail() { 1, true, 1, - workflowState); + workflowState, + false); WorkflowClient.start(workflow::run, input); testEnv.sleep(Duration.ofSeconds(50L)); @@ -187,7 +189,8 @@ public void manualRun() { 1, false, 1, - workflowState); + workflowState, + false); WorkflowClient.start(workflow::run, input); testEnv.sleep(Duration.ofSeconds(30L)); @@ -228,7 +231,8 @@ public void updatedSignalRecieved() { 1, false, 1, - workflowState); + workflowState, + false); WorkflowClient.start(workflow::run, input); testEnv.sleep(Duration.ofSeconds(30L)); @@ -269,7 +273,8 @@ public void cancelNonRunning() { 1, false, 1, - workflowState); + workflowState, + false); WorkflowClient.start(workflow::run, input); testEnv.sleep(Duration.ofSeconds(30L)); @@ -310,7 +315,8 @@ public void deleteSync() { 1, false, 1, - workflowState); + workflowState, + false); WorkflowClient.start(workflow::run, input); testEnv.sleep(Duration.ofSeconds(30L)); @@ -378,7 +384,8 @@ public void manualRun() { 1, false, 1, - workflowState); + workflowState, + false); WorkflowClient.start(workflow::run, input); testEnv.sleep(Duration.ofMinutes(2L)); @@ -407,7 +414,8 @@ public void cancelRunning() { 1, false, 1, - workflowState); + workflowState, + false); WorkflowClient.start(workflow::run, input); workflow.submitManualSync(); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java index 05a280574aac1..6cb059dee1187 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java @@ -5,11 +5,18 @@ package io.airbyte.workers.temporal.scheduling.activities; import io.airbyte.config.Configs.WorkerEnvironment; +import io.airbyte.config.JobOutput; +import io.airbyte.config.StandardSyncOutput; +import io.airbyte.config.StandardSyncSummary; +import io.airbyte.config.StandardSyncSummary.ReplicationStatus; import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.config.helpers.LogConfigs; import io.airbyte.scheduler.models.Job; +import io.airbyte.scheduler.persistence.JobNotifier; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory; +import io.airbyte.scheduler.persistence.job_tracker.JobTracker; +import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState; import io.airbyte.workers.temporal.exception.RetryableException; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationInput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationOutput; @@ -53,12 +60,22 @@ public class JobCreationAndStatusUpdateActivityTest { @Mock private LogConfigs mLogConfigs; + @Mock + private JobNotifier mJobNotifier; + + @Mock + private JobTracker mJobtracker; + @InjectMocks private JobCreationAndStatusUpdateActivityImpl jobCreationAndStatusUpdateActivity; private static final UUID CONNECTION_ID = UUID.randomUUID(); private static final long JOB_ID = 123L; private static final int ATTEMPT_ID = 321; + private static final StandardSyncOutput standardSyncOutput = new StandardSyncOutput() + .withStandardSyncSummary( + new StandardSyncSummary() + .withStatus(ReplicationStatus.COMPLETED)); @Nested class Creation { @@ -69,7 +86,7 @@ public void createJob() { Mockito.when(mJobFactory.create(CONNECTION_ID)) .thenReturn(JOB_ID); - final JobCreationOutput output = jobCreationAndStatusUpdateActivity.createNewJob(new JobCreationInput(CONNECTION_ID)); + final JobCreationOutput output = jobCreationAndStatusUpdateActivity.createNewJob(new JobCreationInput(CONNECTION_ID, false)); Assertions.assertThat(output.getJobId()).isEqualTo(JOB_ID); } @@ -129,9 +146,13 @@ class Update { @Test public void setJobSuccess() throws IOException { - jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(JOB_ID, ATTEMPT_ID)); + jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(JOB_ID, ATTEMPT_ID, standardSyncOutput)); + final JobOutput jobOutput = new JobOutput().withSync(standardSyncOutput); + Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput); Mockito.verify(mJobPersistence).succeedAttempt(JOB_ID, ATTEMPT_ID); + Mockito.verify(mJobNotifier).successJob(Mockito.any()); + Mockito.verify(mJobtracker).trackSync(Mockito.any(), Mockito.eq(JobState.SUCCEEDED)); } @Test @@ -139,16 +160,17 @@ public void setJobSuccessWrapException() throws IOException { Mockito.doThrow(new IOException()) .when(mJobPersistence).succeedAttempt(JOB_ID, ATTEMPT_ID); - Assertions.assertThatThrownBy(() -> jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(JOB_ID, ATTEMPT_ID))) + Assertions.assertThatThrownBy(() -> jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(JOB_ID, ATTEMPT_ID, null))) .isInstanceOf(RetryableException.class) .hasCauseInstanceOf(IOException.class); } @Test public void setJobFailure() throws IOException { - jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(JOB_ID)); + jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(JOB_ID, "reason")); Mockito.verify(mJobPersistence).failJob(JOB_ID); + Mockito.verify(mJobNotifier).failJob(Mockito.eq("reason"), Mockito.any()); } @Test @@ -156,7 +178,7 @@ public void setJobFailureWrapException() throws IOException { Mockito.doThrow(new IOException()) .when(mJobPersistence).failJob(JOB_ID); - Assertions.assertThatThrownBy(() -> jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(JOB_ID))) + Assertions.assertThatThrownBy(() -> jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(JOB_ID, ""))) .isInstanceOf(RetryableException.class) .hasCauseInstanceOf(IOException.class); } diff --git a/docker-compose.yaml b/docker-compose.yaml index 6b4e623882394..0eccd58cc660d 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -71,6 +71,7 @@ services: - LOCAL_ROOT=${LOCAL_ROOT} - LOCAL_DOCKER_MOUNT=${LOCAL_DOCKER_MOUNT} - LOG_LEVEL=${LOG_LEVEL} + - NEW_SCHEDULER=${NEW_SCHEDULER} - SECRET_PERSISTENCE=${SECRET_PERSISTENCE} - SYNC_JOB_MAX_ATTEMPTS=${SYNC_JOB_MAX_ATTEMPTS} - SYNC_JOB_MAX_TIMEOUT_DAYS=${SYNC_JOB_MAX_TIMEOUT_DAYS} @@ -81,8 +82,6 @@ services: - WORKER_ENVIRONMENT=${WORKER_ENVIRONMENT} - WORKSPACE_DOCKER_MOUNT=${WORKSPACE_DOCKER_MOUNT} - WORKSPACE_ROOT=${WORKSPACE_ROOT} - # TODO: Remove before merge - # - NEW_SCHEDULER=valuedoesntmatter volumes: - data:${CONFIG_ROOT} - workspace:${WORKSPACE_ROOT} @@ -93,6 +92,7 @@ services: container_name: airbyte-worker restart: unless-stopped environment: + - AIRBYTE_VERSION=${VERSION} - CONFIG_DATABASE_PASSWORD=${CONFIG_DATABASE_PASSWORD:-} - CONFIG_DATABASE_URL=${CONFIG_DATABASE_URL:-} - CONFIG_DATABASE_USER=${CONFIG_DATABASE_USER:-} @@ -116,6 +116,7 @@ services: - SYNC_JOB_MAX_TIMEOUT_DAYS=${SYNC_JOB_MAX_TIMEOUT_DAYS} - TEMPORAL_HOST=${TEMPORAL_HOST} - TRACKING_STRATEGY=${TRACKING_STRATEGY} + - WEBAPP_URL=${WEBAPP_URL} - WORKER_ENVIRONMENT=${WORKER_ENVIRONMENT} - WORKSPACE_DOCKER_MOUNT=${WORKSPACE_DOCKER_MOUNT} - WORKSPACE_ROOT=${WORKSPACE_ROOT} @@ -145,13 +146,13 @@ services: - JOB_MAIN_CONTAINER_MEMORY_REQUEST=${JOB_MAIN_CONTAINER_MEMORY_REQUEST} - JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=${JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION:-} - LOG_LEVEL=${LOG_LEVEL} + - NEW_SCHEDULER=${NEW_SCHEDULER} - SECRET_PERSISTENCE=${SECRET_PERSISTENCE} - TEMPORAL_HOST=${TEMPORAL_HOST} - TRACKING_STRATEGY=${TRACKING_STRATEGY} - WEBAPP_URL=${WEBAPP_URL} - WORKER_ENVIRONMENT=${WORKER_ENVIRONMENT} - WORKSPACE_ROOT=${WORKSPACE_ROOT} - # - NEW_SCHEDULER=valuedoesntmatter ports: - 8001:8001 volumes: diff --git a/tools/bin/acceptance_test_with_new_scheduler.sh b/tools/bin/acceptance_test_with_new_scheduler.sh new file mode 100755 index 0000000000000..b9338db047f85 --- /dev/null +++ b/tools/bin/acceptance_test_with_new_scheduler.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash + +set -e + +. tools/lib/lib.sh + +assert_root + +echo "Starting app..." + +# Detach so we can run subsequent commands +VERSION=dev TRACKING_STRATEGY=logging NEW_SCHEDULER=true docker-compose up -d +trap "echo 'docker-compose logs:' && docker-compose logs -t --tail 1000 && docker-compose down -v" EXIT + +echo "Waiting for services to begin" +while [[ "$(curl -s -o /dev/null -w ''%{http_code}'' localhost:8000/api/v1/health)" != "200" ]]; do echo "Waiting for docker deployment.."; sleep 5; done + +echo "Running e2e tests via gradle" +SUB_BUILD=PLATFORM USE_EXTERNAL_DEPLOYMENT=true ./gradlew :airbyte-tests:acceptanceTests --rerun-tasks --scan