Skip to content

Commit

Permalink
Re-org reset after backup (#10046)
Browse files Browse the repository at this point in the history
Make sure that we don't call the update operation of the new scheduler if we are performing a reset operation with the new scheduler being activated.
  • Loading branch information
benmoriceau authored Feb 3, 2022
1 parent a938c4d commit dadcf2d
Show file tree
Hide file tree
Showing 5 changed files with 553 additions and 476 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ public ConfigurationApi(final ConfigRepository configRepository,
schedulerHandler,
operationsHandler,
featureFlags,
temporalWorkerRunFactory);
temporalWorkerRunFactory,
connectionHelper);
healthCheckHandler = new HealthCheckHandler();
archiveHandler = new ArchiveHandler(
airbyteVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,17 @@ private Builder<String, Object> generateMetadata(final StandardSync standardSync

public ConnectionRead updateConnection(final ConnectionUpdate connectionUpdate)
throws ConfigNotFoundException, IOException, JsonValidationException {
return updateConnection(connectionUpdate, false);
}

public ConnectionRead updateConnection(final ConnectionUpdate connectionUpdate, boolean isAReset)
throws ConfigNotFoundException, IOException, JsonValidationException {
if (featureFlags.usesNewScheduler()) {
connectionHelper.updateConnection(connectionUpdate);

temporalWorkerRunFactory.update(connectionUpdate);
if (!isAReset) {
temporalWorkerRunFactory.update(connectionUpdate);
}

return connectionHelper.buildConnectionRead(connectionUpdate.getConnectionId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.airbyte.commons.lang.MoreBooleans;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.helper.ConnectionHelper;
import io.airbyte.workers.worker_run.TemporalWorkerRunFactory;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -71,6 +72,7 @@ public class WebBackendConnectionsHandler {
private final OperationsHandler operationsHandler;
private final FeatureFlags featureFlags;
private final TemporalWorkerRunFactory temporalWorkerRunFactory;
private final ConnectionHelper connectionHelper;

public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody)
throws ConfigNotFoundException, IOException, JsonValidationException {
Expand Down Expand Up @@ -255,23 +257,31 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
throws ConfigNotFoundException, IOException, JsonValidationException {
final List<UUID> operationIds = updateOperations(webBackendConnectionUpdate);
final ConnectionUpdate connectionUpdate = toConnectionUpdate(webBackendConnectionUpdate, operationIds);
final ConnectionRead connectionRead = connectionsHandler.updateConnection(connectionUpdate);

if (MoreBooleans.isTruthy(webBackendConnectionUpdate.getWithRefreshedCatalog())) {
final ConnectionIdRequestBody connectionId = new ConnectionIdRequestBody().connectionId(webBackendConnectionUpdate.getConnectionId());

if (featureFlags.usesNewScheduler()) {
temporalWorkerRunFactory.synchronousResetConnection(webBackendConnectionUpdate.getConnectionId());

temporalWorkerRunFactory.startNewManualSync(webBackendConnectionUpdate.getConnectionId());
} else {
ConnectionRead connectionRead;
boolean needReset = MoreBooleans.isTruthy(webBackendConnectionUpdate.getWithRefreshedCatalog());
if (!featureFlags.usesNewScheduler()) {
connectionRead = connectionsHandler.updateConnection(connectionUpdate);
if (needReset) {
final ConnectionIdRequestBody connectionId = new ConnectionIdRequestBody().connectionId(webBackendConnectionUpdate.getConnectionId());
// wait for this to execute
schedulerHandler.resetConnection(connectionId);

// just create the job
schedulerHandler.syncConnection(connectionId);
}
} else {
connectionRead = connectionsHandler.updateConnection(connectionUpdate, !needReset);

if (needReset) {
temporalWorkerRunFactory.synchronousResetConnection(webBackendConnectionUpdate.getConnectionId());

temporalWorkerRunFactory.startNewManualSync(webBackendConnectionUpdate.getConnectionId());

connectionRead = connectionHelper.buildConnectionRead(connectionUpdate.getConnectionId());
}
}

return buildWebBackendConnectionRead(connectionRead);
}

Expand Down
Loading

0 comments on commit dadcf2d

Please sign in to comment.