Skip to content

Commit

Permalink
Bmoric/extract destination api (#18502)
Browse files Browse the repository at this point in the history
* Tmp

* Extract the Attempt API from the V1 API

* Add comments

* Move Connection API out of configuration API

* format

* format

* Rename to Controller

* Rename to Controller

* Add values to the factory

* Change the constructor to use hadler instead of objects needed by the handler

* Update with new tags.

* Fix PMD errors

* Add explicit path to the controller

* Extract destiantion API
  • Loading branch information
benmoriceau authored and nataly committed Nov 3, 2022
1 parent 8635d97 commit 9505596
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 14 deletions.
12 changes: 12 additions & 0 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@
import io.airbyte.server.handlers.AttemptHandler;
import io.airbyte.server.handlers.ConnectionsHandler;
import io.airbyte.server.handlers.DbMigrationHandler;
import io.airbyte.server.handlers.DestinationHandler;
import io.airbyte.server.handlers.OperationsHandler;
import io.airbyte.server.handlers.SchedulerHandler;
import io.airbyte.server.scheduler.DefaultSynchronousSchedulerClient;
import io.airbyte.server.scheduler.EventRunner;
import io.airbyte.server.scheduler.TemporalEventRunner;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
import io.temporal.serviceclient.WorkflowServiceStubs;
Expand Down Expand Up @@ -262,6 +264,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,

final WorkspaceHelper workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence);

final JsonSchemaValidator schemaValidator = new JsonSchemaValidator();

final AttemptHandler attemptHandler = new AttemptHandler(jobPersistence);

final ConnectionsHandler connectionsHandler = new ConnectionsHandler(
Expand All @@ -270,6 +274,13 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
trackingClient,
eventRunner);

final DestinationHandler destinationHandler = new DestinationHandler(
configRepository,
secretsRepositoryReader,
secretsRepositoryWriter,
schemaValidator,
connectionsHandler);

final OperationsHandler operationsHandler = new OperationsHandler(configRepository);

final SchedulerHandler schedulerHandler = new SchedulerHandler(
Expand Down Expand Up @@ -303,6 +314,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
jobsFlyway,
attemptHandler,
connectionsHandler,
destinationHandler,
operationsHandler,
schedulerHandler);
}
Expand Down
14 changes: 12 additions & 2 deletions airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
import io.airbyte.server.apis.AttemptApiController;
import io.airbyte.server.apis.ConfigurationApi;
import io.airbyte.server.apis.ConnectionApiController;
import io.airbyte.server.apis.DestinationApiController;
import io.airbyte.server.apis.binders.AttemptApiBinder;
import io.airbyte.server.apis.binders.ConnectionApiBinder;
import io.airbyte.server.apis.binders.DestinationApiBinder;
import io.airbyte.server.apis.factories.AttemptApiFactory;
import io.airbyte.server.apis.factories.ConnectionApiFactory;
import io.airbyte.server.apis.factories.DestinationApiFactory;
import io.airbyte.server.handlers.AttemptHandler;
import io.airbyte.server.handlers.ConnectionsHandler;
import io.airbyte.server.handlers.DestinationHandler;
import io.airbyte.server.handlers.OperationsHandler;
import io.airbyte.server.handlers.SchedulerHandler;
import io.airbyte.server.scheduler.EventRunner;
Expand Down Expand Up @@ -54,6 +58,7 @@ ServerRunnable create(final SynchronousSchedulerClient synchronousSchedulerClien
final Flyway jobsFlyway,
final AttemptHandler attemptHandler,
final ConnectionsHandler connectionsHandler,
final DestinationHandler destinationApiHandler,
final OperationsHandler operationsHandler,
final SchedulerHandler schedulerHandler);

Expand All @@ -78,6 +83,7 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
final Flyway jobsFlyway,
final AttemptHandler attemptHandler,
final ConnectionsHandler connectionsHandler,
final DestinationHandler destinationApiHandler,
final OperationsHandler operationsHandler,
final SchedulerHandler schedulerHandler) {
final Map<String, String> mdc = MDC.getCopyOfContextMap();
Expand Down Expand Up @@ -111,9 +117,13 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
schedulerHandler,
mdc);

DestinationApiFactory.setValues(destinationApiHandler, schedulerHandler, mdc);

// server configurations
final Set<Class<?>> componentClasses = Set.of(ConfigurationApi.class, AttemptApiController.class, ConnectionApiController.class);
final Set<Object> components = Set.of(new CorsFilter(), new ConfigurationApiBinder(), new AttemptApiBinder(), new ConnectionApiBinder());
final Set<Class<?>> componentClasses = Set.of(ConfigurationApi.class, AttemptApiController.class, ConnectionApiController.class,
DestinationApiController.class);
final Set<Object> components = Set.of(new CorsFilter(), new ConfigurationApiBinder(), new AttemptApiBinder(), new ConnectionApiBinder(),
new DestinationApiBinder());

// construct server
return new ServerApp(airbyteVersion, componentClasses, components);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ public ConfigurationApi(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
final SecretsRepositoryReader secretsRepositoryReader,
final SecretsRepositoryWriter secretsRepositoryWriter,

final SynchronousSchedulerClient synchronousSchedulerClient,
final Database configsDatabase,
final Database jobsDatabase,
Expand Down Expand Up @@ -603,52 +604,85 @@ public DestinationDefinitionSpecificationRead getDestinationDefinitionSpecificat

// DESTINATION IMPLEMENTATION

/**
* This implementation has been moved to {@link DestinationApiController}. Since the path of
* {@link DestinationApiController} is more granular, it will override this implementation
*/
@Override
public DestinationRead createDestination(final DestinationCreate destinationCreate) {
return execute(() -> destinationHandler.createDestination(destinationCreate));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link DestinationApiController}. Since the path of
* {@link DestinationApiController} is more granular, it will override this implementation
*/
@Override
public void deleteDestination(final DestinationIdRequestBody destinationIdRequestBody) {
execute(() -> {
destinationHandler.deleteDestination(destinationIdRequestBody);
return null;
});
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link DestinationApiController}. Since the path of
* {@link DestinationApiController} is more granular, it will override this implementation
*/
@Override
public DestinationRead updateDestination(final DestinationUpdate destinationUpdate) {
return execute(() -> destinationHandler.updateDestination(destinationUpdate));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link DestinationApiController}. Since the path of
* {@link DestinationApiController} is more granular, it will override this implementation
*/
@Override
public DestinationReadList listDestinationsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) {
return execute(() -> destinationHandler.listDestinationsForWorkspace(workspaceIdRequestBody));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link DestinationApiController}. Since the path of
* {@link DestinationApiController} is more granular, it will override this implementation
*/
@Override
public DestinationReadList searchDestinations(final DestinationSearch destinationSearch) {
return execute(() -> destinationHandler.searchDestinations(destinationSearch));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link DestinationApiController}. Since the path of
* {@link DestinationApiController} is more granular, it will override this implementation
*/
@Override
public DestinationRead getDestination(final DestinationIdRequestBody destinationIdRequestBody) {
return execute(() -> destinationHandler.getDestination(destinationIdRequestBody));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link DestinationApiController}. Since the path of
* {@link DestinationApiController} is more granular, it will override this implementation
*/
@Override
public DestinationRead cloneDestination(final DestinationCloneRequestBody destinationCloneRequestBody) {
return execute(() -> destinationHandler.cloneDestination(destinationCloneRequestBody));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link DestinationApiController}. Since the path of
* {@link DestinationApiController} is more granular, it will override this implementation
*/
@Override
public CheckConnectionRead checkConnectionToDestination(final DestinationIdRequestBody destinationIdRequestBody) {
return execute(() -> schedulerHandler.checkDestinationConnectionFromDestinationId(destinationIdRequestBody));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link DestinationApiController}. Since the path of
* {@link DestinationApiController} is more granular, it will override this implementation
*/
@Override
public CheckConnectionRead checkConnectionToDestinationForUpdate(final DestinationUpdate destinationUpdate) {
return execute(() -> schedulerHandler.checkDestinationConnectionFromDestinationIdForUpdate(destinationUpdate));
throw new NotImplementedException();
}

// CONNECTION
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis;

import io.airbyte.api.generated.DestinationApi;
import io.airbyte.api.model.generated.CheckConnectionRead;
import io.airbyte.api.model.generated.DestinationCloneRequestBody;
import io.airbyte.api.model.generated.DestinationCreate;
import io.airbyte.api.model.generated.DestinationIdRequestBody;
import io.airbyte.api.model.generated.DestinationRead;
import io.airbyte.api.model.generated.DestinationReadList;
import io.airbyte.api.model.generated.DestinationSearch;
import io.airbyte.api.model.generated.DestinationUpdate;
import io.airbyte.api.model.generated.WorkspaceIdRequestBody;
import io.airbyte.server.handlers.DestinationHandler;
import io.airbyte.server.handlers.SchedulerHandler;
import javax.ws.rs.Path;
import lombok.AllArgsConstructor;

@Path("/v1/destinations")
@AllArgsConstructor
public class DestinationApiController implements DestinationApi {

private final DestinationHandler destinationHandler;
private final SchedulerHandler schedulerHandler;

@Override
public CheckConnectionRead checkConnectionToDestination(final DestinationIdRequestBody destinationIdRequestBody) {
return ConfigurationApi.execute(() -> schedulerHandler.checkDestinationConnectionFromDestinationId(destinationIdRequestBody));
}

@Override
public CheckConnectionRead checkConnectionToDestinationForUpdate(final DestinationUpdate destinationUpdate) {
return ConfigurationApi.execute(() -> schedulerHandler.checkDestinationConnectionFromDestinationIdForUpdate(destinationUpdate));
}

@Override
public DestinationRead cloneDestination(final DestinationCloneRequestBody destinationCloneRequestBody) {
return ConfigurationApi.execute(() -> destinationHandler.cloneDestination(destinationCloneRequestBody));
}

@Override
public DestinationRead createDestination(final DestinationCreate destinationCreate) {
return ConfigurationApi.execute(() -> destinationHandler.createDestination(destinationCreate));
}

@Override
public void deleteDestination(final DestinationIdRequestBody destinationIdRequestBody) {
ConfigurationApi.execute(() -> {
destinationHandler.deleteDestination(destinationIdRequestBody);
return null;
});
}

@Override
public DestinationRead getDestination(final DestinationIdRequestBody destinationIdRequestBody) {
return ConfigurationApi.execute(() -> destinationHandler.getDestination(destinationIdRequestBody));
}

@Override
public DestinationReadList listDestinationsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) {
return ConfigurationApi.execute(() -> destinationHandler.listDestinationsForWorkspace(workspaceIdRequestBody));
}

@Override
public DestinationReadList searchDestinations(final DestinationSearch destinationSearch) {
return ConfigurationApi.execute(() -> destinationHandler.searchDestinations(destinationSearch));
}

@Override
public DestinationRead updateDestination(final DestinationUpdate destinationUpdate) {
return ConfigurationApi.execute(() -> destinationHandler.updateDestination(destinationUpdate));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis.binders;

import io.airbyte.server.apis.DestinationApiController;
import io.airbyte.server.apis.factories.DestinationApiFactory;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.process.internal.RequestScoped;

public class DestinationApiBinder extends AbstractBinder {

@Override
protected void configure() {
bindFactory(DestinationApiFactory.class)
.to(DestinationApiController.class)
.in(RequestScoped.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis.factories;

import io.airbyte.server.apis.DestinationApiController;
import io.airbyte.server.handlers.DestinationHandler;
import io.airbyte.server.handlers.SchedulerHandler;
import java.util.Map;
import org.glassfish.hk2.api.Factory;
import org.slf4j.MDC;

public class DestinationApiFactory implements Factory<DestinationApiController> {

private static DestinationHandler destinationHandler;
private static SchedulerHandler schedulerHandler;
private static Map<String, String> mdc;

public static void setValues(final DestinationHandler destinationHandler,
final SchedulerHandler schedulerHandler,
final Map<String, String> mdc) {
DestinationApiFactory.destinationHandler = destinationHandler;
DestinationApiFactory.schedulerHandler = schedulerHandler;
DestinationApiFactory.mdc = mdc;
}

@Override
public DestinationApiController provide() {
MDC.setContextMap(DestinationApiFactory.mdc);

return new DestinationApiController(destinationHandler, schedulerHandler);
}

@Override
public void dispose(final DestinationApiController instance) {
/* no op */
}

}

0 comments on commit 9505596

Please sign in to comment.