Skip to content

Commit

Permalink
Bmoric/extract source api (#18944)
Browse files Browse the repository at this point in the history
* Extract Operation API

* Extract scheduler API

* Format

* extract source api
  • Loading branch information
benmoriceau authored Nov 4, 2022
1 parent a53b947 commit 22efa07
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
openApiConfigHandler,
operationsHandler,
schedulerHandler,
sourceHandler,
workspacesHandler);
}

Expand Down
13 changes: 11 additions & 2 deletions airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import io.airbyte.server.apis.OpenapiApiController;
import io.airbyte.server.apis.OperationApiController;
import io.airbyte.server.apis.SchedulerApiController;
import io.airbyte.server.apis.SourceOauthApiController;
import io.airbyte.server.apis.SourceApiController;
import io.airbyte.server.apis.binders.AttemptApiBinder;
import io.airbyte.server.apis.binders.ConnectionApiBinder;
import io.airbyte.server.apis.binders.DbMigrationBinder;
Expand All @@ -44,6 +44,7 @@
import io.airbyte.server.apis.binders.OpenapiApiBinder;
import io.airbyte.server.apis.binders.OperationApiBinder;
import io.airbyte.server.apis.binders.SchedulerApiBinder;
import io.airbyte.server.apis.binders.SourceApiBinder;
import io.airbyte.server.apis.binders.SourceOauthApiBinder;
import io.airbyte.server.apis.factories.AttemptApiFactory;
import io.airbyte.server.apis.factories.ConnectionApiFactory;
Expand All @@ -59,6 +60,7 @@
import io.airbyte.server.apis.factories.OpenapiApiFactory;
import io.airbyte.server.apis.factories.OperationApiFactory;
import io.airbyte.server.apis.factories.SchedulerApiFactory;
import io.airbyte.server.apis.factories.SourceApiFactory;
import io.airbyte.server.apis.factories.SourceOauthApiFactory;
import io.airbyte.server.handlers.AttemptHandler;
import io.airbyte.server.handlers.ConnectionsHandler;
Expand All @@ -72,6 +74,7 @@
import io.airbyte.server.handlers.OpenApiConfigHandler;
import io.airbyte.server.handlers.OperationsHandler;
import io.airbyte.server.handlers.SchedulerHandler;
import io.airbyte.server.handlers.SourceHandler;
import io.airbyte.server.handlers.WorkspacesHandler;
import io.airbyte.server.scheduler.EventRunner;
import io.airbyte.server.scheduler.SynchronousSchedulerClient;
Expand Down Expand Up @@ -112,6 +115,7 @@ ServerRunnable create(final SynchronousSchedulerClient synchronousSchedulerClien
final OpenApiConfigHandler openApiConfigHandler,
final OperationsHandler operationsHandler,
final SchedulerHandler schedulerHandler,
final SourceHandler sourceHandler,
final WorkspacesHandler workspacesHandler);

class Api implements ServerFactory {
Expand Down Expand Up @@ -145,6 +149,7 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
final OpenApiConfigHandler openApiConfigHandler,
final OperationsHandler operationsHandler,
final SchedulerHandler schedulerHandler,
final SourceHandler sourceHandler,
final WorkspacesHandler workspacesHandler) {
final Map<String, String> mdc = MDC.getCopyOfContextMap();

Expand Down Expand Up @@ -203,6 +208,8 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul

SchedulerApiFactory.setValues(schedulerHandler);

SourceApiFactory.setValues(schedulerHandler, sourceHandler);

// server configurations
final Set<Class<?>> componentClasses = Set.of(
ConfigurationApi.class,
Expand All @@ -220,7 +227,8 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
OpenapiApiController.class,
OperationApiController.class,
SchedulerApiController.class,
SourceOauthApiController.class);
SourceApiController.class,
SourceOauthApiFactory.class);

final Set<Object> components = Set.of(
new CorsFilter(),
Expand All @@ -239,6 +247,7 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
new OpenapiApiBinder(),
new OperationApiBinder(),
new SchedulerApiBinder(),
new SourceApiBinder(),
new SourceOauthApiBinder());

// construct server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,57 +447,94 @@ public InternalOperationResult setWorkflowInAttempt(final SetWorkflowInAttemptRe

// SOURCE IMPLEMENTATION

/**
* This implementation has been moved to {@link SourceApiController}. Since the path of
* {@link SourceApiController} is more granular, it will override this implementation
*/
@Override
public SourceRead createSource(final SourceCreate sourceCreate) {
return execute(() -> sourceHandler.createSource(sourceCreate));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link SourceApiController}. Since the path of
* {@link SourceApiController} is more granular, it will override this implementation
*/
@Override
public SourceRead updateSource(final SourceUpdate sourceUpdate) {
return execute(() -> sourceHandler.updateSource(sourceUpdate));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link SourceApiController}. Since the path of
* {@link SourceApiController} is more granular, it will override this implementation
*/
@Override
public SourceReadList listSourcesForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) {
return execute(() -> sourceHandler.listSourcesForWorkspace(workspaceIdRequestBody));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link SourceApiController}. Since the path of
* {@link SourceApiController} is more granular, it will override this implementation
*/
@Override
public SourceReadList searchSources(final SourceSearch sourceSearch) {
return execute(() -> sourceHandler.searchSources(sourceSearch));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link SourceApiController}. Since the path of
* {@link SourceApiController} is more granular, it will override this implementation
*/
@Override
public SourceRead getSource(final SourceIdRequestBody sourceIdRequestBody) {
return execute(() -> sourceHandler.getSource(sourceIdRequestBody));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link SourceApiController}. Since the path of
* {@link SourceApiController} is more granular, it will override this implementation
*/
@Override
public void deleteSource(final SourceIdRequestBody sourceIdRequestBody) {
execute(() -> {
sourceHandler.deleteSource(sourceIdRequestBody);
return null;
});
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link SourceApiController}. Since the path of
* {@link SourceApiController} is more granular, it will override this implementation
*/
@Override
public SourceRead cloneSource(final SourceCloneRequestBody sourceCloneRequestBody) {
return execute(() -> sourceHandler.cloneSource(sourceCloneRequestBody));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link SourceApiController}. Since the path of
* {@link SourceApiController} is more granular, it will override this implementation
*/
@Override
public CheckConnectionRead checkConnectionToSource(final SourceIdRequestBody sourceIdRequestBody) {
return execute(() -> schedulerHandler.checkSourceConnectionFromSourceId(sourceIdRequestBody));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link SourceApiController}. Since the path of
* {@link SourceApiController} is more granular, it will override this implementation
*/
@Override
public CheckConnectionRead checkConnectionToSourceForUpdate(final SourceUpdate sourceUpdate) {
return execute(() -> schedulerHandler.checkSourceConnectionFromSourceIdForUpdate(sourceUpdate));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link SourceApiController}. Since the path of
* {@link SourceApiController} is more granular, it will override this implementation
*/
@Override
public SourceDiscoverSchemaRead discoverSchemaForSource(final SourceDiscoverSchemaRequestBody discoverSchemaRequestBody) {
return execute(() -> schedulerHandler.discoverSchemaForSourceFromSourceId(discoverSchemaRequestBody));
throw new NotImplementedException();
}

// DB MIGRATION
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis;

import io.airbyte.api.generated.SourceApi;
import io.airbyte.api.model.generated.CheckConnectionRead;
import io.airbyte.api.model.generated.SourceCloneRequestBody;
import io.airbyte.api.model.generated.SourceCreate;
import io.airbyte.api.model.generated.SourceDiscoverSchemaRead;
import io.airbyte.api.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.api.model.generated.SourceIdRequestBody;
import io.airbyte.api.model.generated.SourceRead;
import io.airbyte.api.model.generated.SourceReadList;
import io.airbyte.api.model.generated.SourceSearch;
import io.airbyte.api.model.generated.SourceUpdate;
import io.airbyte.api.model.generated.WorkspaceIdRequestBody;
import io.airbyte.server.handlers.SchedulerHandler;
import io.airbyte.server.handlers.SourceHandler;
import javax.ws.rs.Path;
import lombok.AllArgsConstructor;

@Path("/v1/sources")
@AllArgsConstructor
public class SourceApiController implements SourceApi {

private final SchedulerHandler schedulerHandler;
private final SourceHandler sourceHandler;

@Override
public CheckConnectionRead checkConnectionToSource(final SourceIdRequestBody sourceIdRequestBody) {
return ConfigurationApi.execute(() -> schedulerHandler.checkSourceConnectionFromSourceId(sourceIdRequestBody));
}

@Override
public CheckConnectionRead checkConnectionToSourceForUpdate(final SourceUpdate sourceUpdate) {
return ConfigurationApi.execute(() -> schedulerHandler.checkSourceConnectionFromSourceIdForUpdate(sourceUpdate));
}

@Override
public SourceRead cloneSource(final SourceCloneRequestBody sourceCloneRequestBody) {
return ConfigurationApi.execute(() -> sourceHandler.cloneSource(sourceCloneRequestBody));
}

@Override
public SourceRead createSource(final SourceCreate sourceCreate) {
return ConfigurationApi.execute(() -> sourceHandler.createSource(sourceCreate));
}

@Override
public void deleteSource(final SourceIdRequestBody sourceIdRequestBody) {
ConfigurationApi.execute(() -> {
sourceHandler.deleteSource(sourceIdRequestBody);
return null;
});
}

@Override
public SourceDiscoverSchemaRead discoverSchemaForSource(final SourceDiscoverSchemaRequestBody sourceDiscoverSchemaRequestBody) {
return ConfigurationApi.execute(() -> schedulerHandler.discoverSchemaForSourceFromSourceId(sourceDiscoverSchemaRequestBody));
}

@Override
public SourceRead getSource(final SourceIdRequestBody sourceIdRequestBody) {
return ConfigurationApi.execute(() -> sourceHandler.getSource(sourceIdRequestBody));
}

@Override
public SourceReadList listSourcesForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) {
return ConfigurationApi.execute(() -> sourceHandler.listSourcesForWorkspace(workspaceIdRequestBody));
}

@Override
public SourceReadList searchSources(final SourceSearch sourceSearch) {
return ConfigurationApi.execute(() -> sourceHandler.searchSources(sourceSearch));
}

@Override
public SourceRead updateSource(final SourceUpdate sourceUpdate) {
return ConfigurationApi.execute(() -> sourceHandler.updateSource(sourceUpdate));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis.binders;

import io.airbyte.server.apis.SourceApiController;
import io.airbyte.server.apis.factories.SourceApiFactory;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.process.internal.RequestScoped;

public class SourceApiBinder extends AbstractBinder {

@Override
protected void configure() {
bindFactory(SourceApiFactory.class)
.to(SourceApiController.class)
.in(RequestScoped.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis.factories;

import io.airbyte.server.apis.SourceApiController;
import io.airbyte.server.handlers.SchedulerHandler;
import io.airbyte.server.handlers.SourceHandler;
import org.glassfish.hk2.api.Factory;

public class SourceApiFactory implements Factory<SourceApiController> {

private static SchedulerHandler schedulerHandler;
private static SourceHandler sourceHandler;

public static void setValues(final SchedulerHandler schedulerHandler, final SourceHandler sourceHandler) {
SourceApiFactory.schedulerHandler = schedulerHandler;
SourceApiFactory.sourceHandler = sourceHandler;
}

@Override
public SourceApiController provide() {
return new SourceApiController(schedulerHandler, sourceHandler);
}

@Override
public void dispose(final SourceApiController instance) {
/* no op */
}

}

0 comments on commit 22efa07

Please sign in to comment.