Skip to content

Commit

Permalink
Bmoric/extract jobs api (#18524)
Browse files Browse the repository at this point in the history
* Tmp

* Extract the Attempt API from the V1 API

* Add comments

* Move Connection API out of configuration API

* format

* format

* Rename to Controller

* Rename to Controller

* Add values to the factory

* Change the constructor to use hadler instead of objects needed by the handler

* Update with new tags.

* tmp

* Fix PMD errors

* Extract DB migrator

* Add something that I forgot

* extract destination definition api

* restore destination factory initialization

* extract destination definition specification api

* format

* format

* format

* extract health check api

* extract jobs api

* fix test

* format

* Add missing declaration
  • Loading branch information
benmoriceau authored Nov 2, 2022
1 parent 33194df commit 417481e
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 7 deletions.
24 changes: 24 additions & 0 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@
import io.airbyte.server.handlers.DestinationDefinitionsHandler;
import io.airbyte.server.handlers.DestinationHandler;
import io.airbyte.server.handlers.HealthCheckHandler;
import io.airbyte.server.handlers.JobHistoryHandler;
import io.airbyte.server.handlers.OperationsHandler;
import io.airbyte.server.handlers.SchedulerHandler;
import io.airbyte.server.handlers.SourceDefinitionsHandler;
import io.airbyte.server.handlers.SourceHandler;
import io.airbyte.server.scheduler.DefaultSynchronousSchedulerClient;
import io.airbyte.server.scheduler.EventRunner;
import io.airbyte.server.scheduler.TemporalEventRunner;
Expand Down Expand Up @@ -295,6 +298,26 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,

final HealthCheckHandler healthCheckHandler = new HealthCheckHandler(configRepository);

final SourceHandler sourceHandler = new SourceHandler(
configRepository,
secretsRepositoryReader,
secretsRepositoryWriter,
schemaValidator,
connectionsHandler);

final SourceDefinitionsHandler sourceDefinitionsHandler = new SourceDefinitionsHandler(configRepository, syncSchedulerClient, sourceHandler);

final JobHistoryHandler jobHistoryHandler = new JobHistoryHandler(
jobPersistence,
configs.getWorkerEnvironment(),
configs.getLogConfigs(),
connectionsHandler,
sourceHandler,
sourceDefinitionsHandler,
destinationHandler,
destinationDefinitionsHandler,
configs.getAirbyteVersion());

LOGGER.info("Starting server...");

return apiFactory.create(
Expand All @@ -320,6 +343,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
destinationDefinitionsHandler,
destinationHandler,
healthCheckHandler,
jobHistoryHandler,
operationsHandler,
schedulerHandler);
}
Expand Down
14 changes: 12 additions & 2 deletions airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,30 @@
import io.airbyte.server.apis.DestinationDefinitionApiController;
import io.airbyte.server.apis.DestinationDefinitionSpecificationApiController;
import io.airbyte.server.apis.HealthApiController;
import io.airbyte.server.apis.JobsApiController;
import io.airbyte.server.apis.binders.AttemptApiBinder;
import io.airbyte.server.apis.binders.ConnectionApiBinder;
import io.airbyte.server.apis.binders.DbMigrationBinder;
import io.airbyte.server.apis.binders.DestinationApiBinder;
import io.airbyte.server.apis.binders.DestinationDefinitionApiBinder;
import io.airbyte.server.apis.binders.DestinationDefinitionSpecificationApiBinder;
import io.airbyte.server.apis.binders.HealthApiBinder;
import io.airbyte.server.apis.binders.JobsApiBinder;
import io.airbyte.server.apis.factories.AttemptApiFactory;
import io.airbyte.server.apis.factories.ConnectionApiFactory;
import io.airbyte.server.apis.factories.DbMigrationApiFactory;
import io.airbyte.server.apis.factories.DestinationApiFactory;
import io.airbyte.server.apis.factories.DestinationDefinitionApiFactory;
import io.airbyte.server.apis.factories.DestinationDefinitionSpecificationApiFactory;
import io.airbyte.server.apis.factories.HealthApiFactory;
import io.airbyte.server.apis.factories.JobsApiFactory;
import io.airbyte.server.handlers.AttemptHandler;
import io.airbyte.server.handlers.ConnectionsHandler;
import io.airbyte.server.handlers.DbMigrationHandler;
import io.airbyte.server.handlers.DestinationDefinitionsHandler;
import io.airbyte.server.handlers.DestinationHandler;
import io.airbyte.server.handlers.HealthCheckHandler;
import io.airbyte.server.handlers.JobHistoryHandler;
import io.airbyte.server.handlers.OperationsHandler;
import io.airbyte.server.handlers.SchedulerHandler;
import io.airbyte.server.scheduler.EventRunner;
Expand Down Expand Up @@ -77,6 +81,7 @@ ServerRunnable create(final SynchronousSchedulerClient synchronousSchedulerClien
final DestinationDefinitionsHandler destinationDefinitionsHandler,
final DestinationHandler destinationApiHandler,
final HealthCheckHandler healthCheckHandler,
final JobHistoryHandler jobHistoryHandler,
final OperationsHandler operationsHandler,
final SchedulerHandler schedulerHandler);

Expand Down Expand Up @@ -105,6 +110,7 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
final DestinationDefinitionsHandler destinationDefinitionsHandler,
final DestinationHandler destinationApiHandler,
final HealthCheckHandler healthCheckHandler,
final JobHistoryHandler jobHistoryHandler,
final OperationsHandler operationsHandler,
final SchedulerHandler schedulerHandler) {
final Map<String, String> mdc = MDC.getCopyOfContextMap();
Expand Down Expand Up @@ -148,6 +154,8 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul

HealthApiFactory.setValues(healthCheckHandler);

JobsApiFactory.setValues(jobHistoryHandler, schedulerHandler);

// server configurations
final Set<Class<?>> componentClasses = Set.of(
ConfigurationApi.class,
Expand All @@ -157,7 +165,8 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
DestinationApiController.class,
DestinationDefinitionApiController.class,
DestinationDefinitionSpecificationApiController.class,
HealthApiController.class);
HealthApiController.class,
JobsApiController.class);

final Set<Object> components = Set.of(
new CorsFilter(),
Expand All @@ -168,7 +177,8 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
new DestinationApiBinder(),
new DestinationDefinitionApiBinder(),
new DestinationDefinitionSpecificationApiBinder(),
new HealthApiBinder());
new HealthApiBinder(),
new JobsApiBinder());

// construct server
return new ServerApp(airbyteVersion, componentClasses, components);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -889,33 +889,57 @@ public SourceDiscoverSchemaRead executeSourceDiscoverSchema(final SourceCoreConf
return execute(() -> schedulerHandler.discoverSchemaForSourceFromSourceCreate(sourceCreate));
}

/**
* This implementation has been moved to {@link JobsApiController}. Since the path of
* {@link JobsApiController} is more granular, it will override this implementation
*/
@Override
public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) {
return execute(() -> schedulerHandler.cancelJob(jobIdRequestBody));
throw new NotImplementedException();
}

// JOB HISTORY

/**
* This implementation has been moved to {@link JobsApiController}. Since the path of
* {@link JobsApiController} is more granular, it will override this implementation
*/
@Override
public JobReadList listJobsFor(final JobListRequestBody jobListRequestBody) {
return execute(() -> jobHistoryHandler.listJobsFor(jobListRequestBody));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link JobsApiController}. Since the path of
* {@link JobsApiController} is more granular, it will override this implementation
*/
@Override
public JobInfoRead getJobInfo(final JobIdRequestBody jobIdRequestBody) {
return execute(() -> jobHistoryHandler.getJobInfo(jobIdRequestBody));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link JobsApiController}. Since the path of
* {@link JobsApiController} is more granular, it will override this implementation
*/
@Override
public JobInfoLightRead getJobInfoLight(final JobIdRequestBody jobIdRequestBody) {
return execute(() -> jobHistoryHandler.getJobInfoLight(jobIdRequestBody));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link JobsApiController}. Since the path of
* {@link JobsApiController} is more granular, it will override this implementation
*/
@Override
public JobDebugInfoRead getJobDebugInfo(final JobIdRequestBody jobIdRequestBody) {
return execute(() -> jobHistoryHandler.getJobDebugInfo(jobIdRequestBody));
throw new NotImplementedException();
}

/**
* This implementation has been moved to {@link JobsApiController}. Since the path of
* {@link JobsApiController} is more granular, it will override this implementation
*/
@Override
public AttemptNormalizationStatusReadList getAttemptNormalizationStatusesForJob(final JobIdRequestBody jobIdRequestBody) {
return execute(() -> jobHistoryHandler.getAttemptNormalizationStatuses(jobIdRequestBody));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis;

import io.airbyte.api.generated.JobsApi;
import io.airbyte.api.model.generated.AttemptNormalizationStatusReadList;
import io.airbyte.api.model.generated.JobDebugInfoRead;
import io.airbyte.api.model.generated.JobIdRequestBody;
import io.airbyte.api.model.generated.JobInfoLightRead;
import io.airbyte.api.model.generated.JobInfoRead;
import io.airbyte.api.model.generated.JobListRequestBody;
import io.airbyte.api.model.generated.JobReadList;
import io.airbyte.server.handlers.JobHistoryHandler;
import io.airbyte.server.handlers.SchedulerHandler;
import javax.ws.rs.Path;
import lombok.AllArgsConstructor;

@Path("/v1/jobs")
@AllArgsConstructor
public class JobsApiController implements JobsApi {

private final JobHistoryHandler jobHistoryHandler;
private final SchedulerHandler schedulerHandler;

@Override
public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) {
return ConfigurationApi.execute(() -> schedulerHandler.cancelJob(jobIdRequestBody));
}

@Override
public AttemptNormalizationStatusReadList getAttemptNormalizationStatusesForJob(final JobIdRequestBody jobIdRequestBody) {
return ConfigurationApi.execute(() -> jobHistoryHandler.getAttemptNormalizationStatuses(jobIdRequestBody));
}

@Override
public JobDebugInfoRead getJobDebugInfo(final JobIdRequestBody jobIdRequestBody) {
return ConfigurationApi.execute(() -> jobHistoryHandler.getJobDebugInfo(jobIdRequestBody));
}

@Override
public JobInfoRead getJobInfo(final JobIdRequestBody jobIdRequestBody) {
return ConfigurationApi.execute(() -> jobHistoryHandler.getJobInfo(jobIdRequestBody));
}

@Override
public JobInfoLightRead getJobInfoLight(final JobIdRequestBody jobIdRequestBody) {
return ConfigurationApi.execute(() -> jobHistoryHandler.getJobInfoLight(jobIdRequestBody));
}

@Override
public JobReadList listJobsFor(final JobListRequestBody jobListRequestBody) {
return ConfigurationApi.execute(() -> jobHistoryHandler.listJobsFor(jobListRequestBody));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis.binders;

import io.airbyte.server.apis.JobsApiController;
import io.airbyte.server.apis.factories.JobsApiFactory;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.process.internal.RequestScoped;

public class JobsApiBinder extends AbstractBinder {

@Override
protected void configure() {
bindFactory(JobsApiFactory.class)
.to(JobsApiController.class)
.in(RequestScoped.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis.factories;

import io.airbyte.server.apis.JobsApiController;
import io.airbyte.server.handlers.JobHistoryHandler;
import io.airbyte.server.handlers.SchedulerHandler;
import org.glassfish.hk2.api.Factory;

public class JobsApiFactory implements Factory<JobsApiController> {

private static JobHistoryHandler jobHistoryHandler;
private static SchedulerHandler schedulerHandler;

public static void setValues(final JobHistoryHandler jobHistoryHandler, final SchedulerHandler schedulerHandler) {
JobsApiFactory.jobHistoryHandler = jobHistoryHandler;
JobsApiFactory.schedulerHandler = schedulerHandler;
}

@Override
public JobsApiController provide() {
return new JobsApiController(jobHistoryHandler, schedulerHandler);
}

@Override
public void dispose(final JobsApiController instance) {
/* no op */
}

}

0 comments on commit 417481e

Please sign in to comment.