Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bmoric/move flag check to handler #10469

Merged
merged 27 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
90d7cfe
If an activity is failing, stuck the workflow and make it queriable
benmoriceau Feb 5, 2022
9468e4a
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/sig…
benmoriceau Feb 8, 2022
d997ce1
Allow a workflow to be stuck
benmoriceau Feb 8, 2022
2713985
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/sig…
benmoriceau Feb 8, 2022
fa3412e
Update test with merge issues
benmoriceau Feb 8, 2022
6aefd6e
another merge issue fix
benmoriceau Feb 8, 2022
a572187
More merge conflict resolution
benmoriceau Feb 9, 2022
6fc92d7
Update merge conflicts
benmoriceau Feb 9, 2022
05a4b68
Finish to fix the merge conflict
benmoriceau Feb 9, 2022
e7b7f87
Ensure that the preper flag is set
benmoriceau Feb 10, 2022
5ae9a9d
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/sig…
benmoriceau Feb 15, 2022
9fe026f
fix new tests
benmoriceau Feb 15, 2022
f076615
Extract stuck test
benmoriceau Feb 15, 2022
dc75830
Rename and PR comments
benmoriceau Feb 15, 2022
53b94ab
Remove unused state override
benmoriceau Feb 15, 2022
26dbdc9
Move what we can inside the cancellation scope
benmoriceau Feb 15, 2022
f939771
extract jobId creation and make it non optional
benmoriceau Feb 15, 2022
d6e1e5a
tmp
benmoriceau Feb 16, 2022
f5a013b
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/ref…
benmoriceau Feb 16, 2022
05ffdf3
Re-introduce workflow state
benmoriceau Feb 16, 2022
994e83a
re-organize the connection manager
benmoriceau Feb 17, 2022
9a3f3e7
Move the private methods to the bottom.
benmoriceau Feb 17, 2022
f781248
PR Comments
benmoriceau Feb 17, 2022
146aebe
Add internal state
benmoriceau Feb 17, 2022
fc4a2f8
tmp
benmoriceau Feb 18, 2022
ab4606e
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/mov…
benmoriceau Feb 18, 2022
b1422cd
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/mov…
benmoriceau Feb 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ public class ConfigurationApi implements io.airbyte.api.V1Api {
private final WorkerEnvironment workerEnvironment;
private final LogConfigs logConfigs;
private final Path workspaceRoot;
private final FeatureFlags featureFlags;

public ConfigurationApi(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
Expand Down Expand Up @@ -191,7 +190,7 @@ public ConfigurationApi(final ConfigRepository configRepository,
jobPersistence,
jobNotifier,
temporalService,
new OAuthConfigSupplier(configRepository, trackingClient), workerEnvironment, logConfigs, temporalWorkerRunFactory);
new OAuthConfigSupplier(configRepository, trackingClient), workerEnvironment, logConfigs, temporalWorkerRunFactory, featureFlags);
final ConnectionHelper connectionHelper = new ConnectionHelper(configRepository, workspaceHelper, workerConfigs);
connectionsHandler = new ConnectionsHandler(
configRepository,
Expand Down Expand Up @@ -232,7 +231,6 @@ public ConfigurationApi(final ConfigRepository configRepository,
logsHandler = new LogsHandler();
openApiConfigHandler = new OpenApiConfigHandler();
dbMigrationHandler = new DbMigrationHandler(configsDatabase, jobsDatabase);
this.featureFlags = featureFlags;
}

// WORKSPACE
Expand Down Expand Up @@ -560,19 +558,11 @@ public void deleteConnection(final ConnectionIdRequestBody connectionIdRequestBo

@Override
public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody) {
if (featureFlags.usesNewScheduler()) {
return execute(() -> schedulerHandler.createManualRun(connectionIdRequestBody.getConnectionId()));
}

return execute(() -> schedulerHandler.syncConnection(connectionIdRequestBody));
}

@Override
public JobInfoRead resetConnection(final ConnectionIdRequestBody connectionIdRequestBody) {
if (featureFlags.usesNewScheduler()) {
return execute(() -> schedulerHandler.resetConnection(connectionIdRequestBody.getConnectionId()));
}

return execute(() -> schedulerHandler.resetConnection(connectionIdRequestBody));
}

Expand Down Expand Up @@ -634,10 +624,6 @@ public SourceDiscoverSchemaRead executeSourceDiscoverSchema(final SourceCoreConf

@Override
public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) {
if (featureFlags.usesNewScheduler()) {
return execute(() -> schedulerHandler.createNewSchedulerCancellation(jobIdRequestBody.getId()));
}

return execute(() -> schedulerHandler.cancelJob(jobIdRequestBody));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.airbyte.api.model.SourceUpdate;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.JobConfig.ConfigType;
Expand Down Expand Up @@ -88,6 +89,7 @@ public class SchedulerHandler {
private final WorkerEnvironment workerEnvironment;
private final LogConfigs logConfigs;
private final TemporalWorkerRunFactory temporalWorkerRunFactory;
private final FeatureFlags featureFlags;

public SchedulerHandler(final ConfigRepository configRepository,
final SchedulerJobClient schedulerJobClient,
Expand All @@ -98,7 +100,8 @@ public SchedulerHandler(final ConfigRepository configRepository,
final OAuthConfigSupplier oAuthConfigSupplier,
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfigs,
final TemporalWorkerRunFactory temporalWorkerRunFactory) {
final TemporalWorkerRunFactory temporalWorkerRunFactory,
final FeatureFlags featureFlags) {
this(
configRepository,
schedulerJobClient,
Expand All @@ -111,7 +114,9 @@ public SchedulerHandler(final ConfigRepository configRepository,
oAuthConfigSupplier,
workerEnvironment,
logConfigs,
temporalWorkerRunFactory);
temporalWorkerRunFactory,
featureFlags,
new JobConverter(workerEnvironment, logConfigs));
}

@VisibleForTesting
Expand All @@ -126,7 +131,9 @@ public SchedulerHandler(final ConfigRepository configRepository,
final OAuthConfigSupplier oAuthConfigSupplier,
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfigs,
final TemporalWorkerRunFactory temporalWorkerRunFactory) {
final TemporalWorkerRunFactory temporalWorkerRunFactory,
final FeatureFlags featureFlags,
final JobConverter jobConverter) {
this.configRepository = configRepository;
this.schedulerJobClient = schedulerJobClient;
this.synchronousSchedulerClient = synchronousSchedulerClient;
Expand All @@ -138,8 +145,9 @@ public SchedulerHandler(final ConfigRepository configRepository,
this.oAuthConfigSupplier = oAuthConfigSupplier;
this.workerEnvironment = workerEnvironment;
this.logConfigs = logConfigs;
this.jobConverter = new JobConverter(workerEnvironment, logConfigs);
this.temporalWorkerRunFactory = temporalWorkerRunFactory;
this.featureFlags = featureFlags;
this.jobConverter = jobConverter;
}

public CheckConnectionRead checkSourceConnectionFromSourceId(final SourceIdRequestBody sourceIdRequestBody)
Expand Down Expand Up @@ -307,6 +315,9 @@ public DestinationDefinitionSpecificationRead getDestinationSpecification(

public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody)
throws ConfigNotFoundException, IOException, JsonValidationException {
if (featureFlags.usesNewScheduler()) {
return createManualRun(connectionIdRequestBody.getConnectionId());
}
final UUID connectionId = connectionIdRequestBody.getConnectionId();
final StandardSync standardSync = configRepository.getStandardSync(connectionId);

Expand Down Expand Up @@ -347,20 +358,11 @@ public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequ
return jobConverter.getJobInfoRead(job);
}

public JobInfoRead resetConnection(final UUID connectionId) throws IOException {
final ManualSyncSubmissionResult manualSyncSubmissionResult = temporalWorkerRunFactory.resetConnection(connectionId);

if (manualSyncSubmissionResult.getFailingReason().isPresent()) {
throw new IllegalStateException(manualSyncSubmissionResult.getFailingReason().get());
}

final Job job = jobPersistence.getJob(manualSyncSubmissionResult.getJobId().get());

return jobConverter.getJobInfoRead(job);
}

public JobInfoRead resetConnection(final ConnectionIdRequestBody connectionIdRequestBody)
throws IOException, JsonValidationException, ConfigNotFoundException {
if (featureFlags.usesNewScheduler()) {
return resetConnectionWithNewScheduler(connectionIdRequestBody.getConnectionId());
}
final UUID connectionId = connectionIdRequestBody.getConnectionId();
final StandardSync standardSync = configRepository.getStandardSync(connectionId);

Expand Down Expand Up @@ -394,6 +396,10 @@ public ConnectionState getState(final ConnectionIdRequestBody connectionIdReques

// todo (cgardens) - this method needs a test.
public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOException {
if (featureFlags.usesNewScheduler()) {
createNewSchedulerCancellation(jobIdRequestBody.getId());
}

final long jobId = jobIdRequestBody.getId();

// prevent this job from being scheduled again
Expand Down Expand Up @@ -453,7 +459,20 @@ private ConnectorSpecification getSpecFromDestinationDefinitionId(final UUID des
return destinationDef.getSpec();
}

public JobInfoRead createManualRun(final UUID connectionId) throws IOException {
private JobInfoRead createNewSchedulerCancellation(final Long id) throws IOException {
final Job job = jobPersistence.getJob(id);

final ManualSyncSubmissionResult cancellationSubmissionResult = temporalWorkerRunFactory.startNewCancelation(UUID.fromString(job.getScope()));

if (cancellationSubmissionResult.getFailingReason().isPresent()) {
throw new IllegalStateException(cancellationSubmissionResult.getFailingReason().get());
}

final Job cancelledJob = jobPersistence.getJob(id);
return jobConverter.getJobInfoRead(cancelledJob);
}

private JobInfoRead createManualRun(final UUID connectionId) throws IOException {
final ManualSyncSubmissionResult manualSyncSubmissionResult = temporalWorkerRunFactory.startNewManualSync(connectionId);

if (manualSyncSubmissionResult.getFailingReason().isPresent()) {
Expand All @@ -465,17 +484,16 @@ public JobInfoRead createManualRun(final UUID connectionId) throws IOException {
return jobConverter.getJobInfoRead(job);
}

public JobInfoRead createNewSchedulerCancellation(final Long id) throws IOException {
final Job job = jobPersistence.getJob(id);

final ManualSyncSubmissionResult cancellationSubmissionResult = temporalWorkerRunFactory.startNewCancelation(UUID.fromString(job.getScope()));
private JobInfoRead resetConnectionWithNewScheduler(final UUID connectionId) throws IOException {
final ManualSyncSubmissionResult manualSyncSubmissionResult = temporalWorkerRunFactory.resetConnection(connectionId);

if (cancellationSubmissionResult.getFailingReason().isPresent()) {
throw new IllegalStateException(cancellationSubmissionResult.getFailingReason().get());
if (manualSyncSubmissionResult.getFailingReason().isPresent()) {
throw new IllegalStateException(manualSyncSubmissionResult.getFailingReason().get());
}

final Job cancelledJob = jobPersistence.getJob(id);
return jobConverter.getJobInfoRead(cancelledJob);
final Job job = jobPersistence.getJob(manualSyncSubmissionResult.getJobId().get());

return jobConverter.getJobInfoRead(job);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
Expand All @@ -35,6 +36,7 @@
import io.airbyte.api.model.SourceUpdate;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.config.Configs.WorkerEnvironment;
Expand Down Expand Up @@ -69,11 +71,13 @@
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier;
import io.airbyte.server.converters.ConfigurationUpdate;
import io.airbyte.server.converters.JobConverter;
import io.airbyte.server.helpers.ConnectionHelpers;
import io.airbyte.server.helpers.DestinationHelpers;
import io.airbyte.server.helpers.SourceHelpers;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.TemporalClient.ManualSyncSubmissionResult;
import io.airbyte.workers.worker_run.TemporalWorkerRunFactory;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
Expand Down Expand Up @@ -129,6 +133,8 @@ class SchedulerHandlerTest {
private JsonSchemaValidator jsonSchemaValidator;
private JobPersistence jobPersistence;
private TemporalWorkerRunFactory temporalWorkerRunFactory;
private FeatureFlags featureFlags;
private JobConverter jobConverter;

@BeforeEach
void setup() {
Expand All @@ -147,6 +153,11 @@ void setup() {
final JobNotifier jobNotifier = mock(JobNotifier.class);
temporalWorkerRunFactory = mock(TemporalWorkerRunFactory.class);

featureFlags = mock(FeatureFlags.class);
when(featureFlags.usesNewScheduler()).thenReturn(false);

jobConverter = spy(new JobConverter(WorkerEnvironment.DOCKER, LogConfigs.EMPTY));

schedulerHandler = new SchedulerHandler(
configRepository,
schedulerJobClient,
Expand All @@ -159,7 +170,9 @@ void setup() {
mock(OAuthConfigSupplier.class),
WorkerEnvironment.DOCKER,
LogConfigs.EMPTY,
temporalWorkerRunFactory);
temporalWorkerRunFactory,
featureFlags,
jobConverter);
}

@Test
Expand Down Expand Up @@ -581,6 +594,30 @@ void testEnumConversion() {
assertTrue(Enums.isCompatible(JobStatus.class, io.airbyte.api.model.JobStatus.class));
}

@Test
Copy link
Contributor Author

@benmoriceau benmoriceau Feb 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Add more tests.

void testNewSchedulerSync() throws JsonValidationException, ConfigNotFoundException, IOException {
when(featureFlags.usesNewScheduler()).thenReturn(true);

UUID connectionId = UUID.randomUUID();

long jobId = 123L;
ManualSyncSubmissionResult manualSyncSubmissionResult = ManualSyncSubmissionResult
.builder()
.failingReason(Optional.empty())
.jobId(Optional.of(jobId))
.build();

when(temporalWorkerRunFactory.startNewManualSync(connectionId))
.thenReturn(manualSyncSubmissionResult);

doReturn(new JobInfoRead())
.when(jobConverter).getJobInfoRead(any());

schedulerHandler.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));

verify(temporalWorkerRunFactory).startNewManualSync(connectionId);
}

private static List<StandardSyncOperation> getOperations(final StandardSync standardSync) {
if (standardSync.getOperationIds() != null && !standardSync.getOperationIds().isEmpty()) {
return List.of(getOperation(standardSync.getOperationIds().get(0)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Builder;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.StopWatch;
Expand Down Expand Up @@ -235,7 +236,8 @@ public void update(final ConnectionUpdate connectionUpdate) throws JsonValidatio
}

@Value
public class ManualSyncSubmissionResult {
@Builder
public static class ManualSyncSubmissionResult {

final Optional<String> failingReason;
final Optional<Long> jobId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,15 +441,16 @@ private void reportJobStarting() {
}

/**
* Start the child {@link SyncWorkflow}. We are using a child workflow here for two main reason:
* <<<<<<< HEAD Start the child SyncWorkflow ======= Start the child {@link SyncWorkflow}. We are
* using a child workflow here for two main reason:
* <p>
* - Originally the Sync workflow was living by himself and was launch by the scheduler. In order to
* limit the potential migration issues, we kept the {@link SyncWorkflow} as is and launch it as a
* child workflow.
* <p>
* - The {@link SyncWorkflow} has different requirements than the {@link ConnectionManagerWorkflow}
* since the latter is a long running workflow, in the future, using a different Node pool would
* make sense.
* make sense. >>>>>>> 76e969f2e5e1b869648142c3565b7375b1892999
*/
private StandardSyncOutput runChildWorkflow(GeneratedJobInput jobInputs) {
final SyncWorkflow childSync = Workflow.newChildWorkflowStub(SyncWorkflow.class,
Expand Down