Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace WorkerPlane with Micronaut environments #17286

Merged
merged 3 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ LOG_LEVEL=INFO

### APPLICATIONS ###
# Worker #
WORKERS_MICRONAUT_ENVIRONMENTS=control
WORKERS_MICRONAUT_ENVIRONMENTS=control-plane
# Relevant to scaling.
MAX_SYNC_WORKERS=5
MAX_SPEC_WORKERS=5
Expand Down
2 changes: 1 addition & 1 deletion .env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ API_URL=/api/v1/
INTERNAL_API_HOST=airbyte-server:8001
SYNC_JOB_MAX_ATTEMPTS=3
SYNC_JOB_MAX_TIMEOUT_DAYS=3
WORKERS_MICRONAUT_ENVIRONMENTS=control
WORKERS_MICRONAUT_ENVIRONMENTS=control-plane

# Sentry
SENTRY_DSN=""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,15 +568,6 @@ public interface Configs {
*/
boolean shouldRunConnectionManagerWorkflows();

/**
* Define if the worker is operating within Airbyte's Control Plane, or within an external Data
* Plane. - Workers in the Control Plane process tasks related to control-flow, like scheduling and
* routing, as well as data syncing tasks that are enqueued for the Control Plane's default task
* queue. - Workers in a Data Plane process only tasks related to data syncing that are specifically
* enqueued for that worker's particular Data Plane.
*/
WorkerPlane getWorkerPlane();

// Worker - Control Plane configs

/**
Expand Down Expand Up @@ -717,9 +708,4 @@ enum SecretPersistenceType {
VAULT
}

enum WorkerPlane {
CONTROL_PLANE,
DATA_PLANE
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ public class EnvConfigs implements Configs {
private static final String SHOULD_RUN_DISCOVER_WORKFLOWS = "SHOULD_RUN_DISCOVER_WORKFLOWS";
private static final String SHOULD_RUN_SYNC_WORKFLOWS = "SHOULD_RUN_SYNC_WORKFLOWS";
private static final String SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS = "SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS";
private static final String WORKER_PLANE = "WORKER_PLANE";

// Worker - Control plane configs
private static final String DEFAULT_DATA_SYNC_TASK_QUEUES = "SYNC"; // should match TemporalJobType.SYNC.name()
Expand Down Expand Up @@ -229,8 +228,6 @@ public EnvConfigs(final Map<String, String> envMap) {
this.getAllEnvKeys = envMap::keySet;
this.logConfigs = new LogConfigs(getLogConfiguration());
this.stateStorageCloudConfigs = getStateStorageConfiguration().orElse(null);

validateSyncWorkflowConfigs();
}

private Optional<CloudStorageConfigs> getLogConfiguration() {
Expand Down Expand Up @@ -931,11 +928,6 @@ public boolean shouldRunConnectionManagerWorkflows() {
return getEnvOrDefault(SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS, true);
}

@Override
public WorkerPlane getWorkerPlane() {
return getEnvOrDefault(WORKER_PLANE, WorkerPlane.CONTROL_PLANE, s -> WorkerPlane.valueOf(s.toUpperCase()));
}

// Worker - Control plane

@Override
Expand Down Expand Up @@ -973,22 +965,6 @@ public String getDataPlaneServiceAccountEmail() {
return getEnvOrDefault(DATA_PLANE_SERVICE_ACCOUNT_EMAIL, "");
}

/**
* Ensures the user hasn't configured themselves into a corner by making sure that the worker is set
* up to properly process sync workflows. With sensible defaults, it should be hard to fail this
* validation, but this provides a safety net regardless.
*/
private void validateSyncWorkflowConfigs() {
if (shouldRunSyncWorkflows()) {
if (getWorkerPlane().equals(WorkerPlane.DATA_PLANE) && getDataSyncTaskQueues().isEmpty()) {
throw new IllegalArgumentException(String.format(
"When %s is true, the worker must either be configured as a Control Plane worker, or %s must be non-empty.",
SHOULD_RUN_SYNC_WORKFLOWS,
DATA_SYNC_TASK_QUEUES));
}
}
}

@Override
public Set<Integer> getTemporalWorkerPorts() {
final var ports = getEnvOrDefault(TEMPORAL_WORKER_PORTS, "");
Expand Down
2 changes: 1 addition & 1 deletion airbyte-workers/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ run {

environment 'AIRBYTE_ROLE', System.getenv('AIRBYTE_ROLE')
environment 'AIRBYTE_VERSION', env.VERSION
environment 'MICRONAUT_ENVIRONMENTS', 'local,control'
environment 'MICRONAUT_ENVIRONMENTS', 'control-plane'
}

task cloudStorageIntegrationTest(type: Test) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import io.airbyte.config.Configs.DeploymentMode;
import io.airbyte.config.Configs.TrackingStrategy;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.Configs.WorkerPlane;
import io.airbyte.config.MaxWorkersConfig;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.helpers.LogConfigs;
Expand All @@ -23,6 +22,7 @@
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.MetricEmittingApps;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.workers.config.WorkerMode;
import io.airbyte.workers.process.KubePortManagerSingleton;
import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflowImpl;
import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflowImpl;
Expand Down Expand Up @@ -138,21 +138,21 @@ public class ApplicationInitializer implements ApplicationEventListener<ServiceR
private WorkerEnvironment workerEnvironment;
@Inject
private WorkerFactory workerFactory;
@Inject
private WorkerPlane workerPlane;
@Value("${airbyte.workspace.root}")
private String workspaceRoot;
@Value("${temporal.cloud.namespace}")
private String temporalCloudNamespace;
@Value("${airbyte.data.sync.task-queue}")
private String syncTaskQueue;
@Inject
private Environment environment;

@Override
public void onApplicationEvent(final ServiceReadyEvent event) {
try {
initializeCommonDependencies();

if (WorkerPlane.CONTROL_PLANE.equals(workerPlane)) {
if (environment.getActiveNames().contains(WorkerMode.CONTROL_PLANE)) {
initializeControlPlaneDependencies();
} else {
log.info("Skipping Control Plane dependency initialization.");
Expand All @@ -163,7 +163,7 @@ public void onApplicationEvent(final ServiceReadyEvent event) {
log.info("Starting worker factory...");
workerFactory.start();

log.info("Application initialized (mode = {}).", workerPlane);
log.info("Application initialized.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we see the environment.getActiveNames somewhere? Otherwise, I think it could be worth logging for debugging purpose.

} catch (final DatabaseCheckException | ExecutionException | InterruptedException | IOException | TimeoutException e) {
log.error("Unable to initialize application.", e);
throw new IllegalStateException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,15 @@
public class ActivityBeanFactory {

@Singleton
@Requires(property = "airbyte.worker.plane",
pattern = "(?i)^(?!data_plane).*")
@Requires(env = WorkerMode.CONTROL_PLANE)
@Named("checkConnectionActivities")
public List<Object> checkConnectionActivities(
final CheckConnectionActivity checkConnectionActivity) {
return List.of(checkConnectionActivity);
}

@Singleton
@Requires(property = "airbyte.worker.plane",
pattern = "(?i)^(?!data_plane).*")
@Requires(env = WorkerMode.CONTROL_PLANE)
@Named("connectionManagerActivities")
public List<Object> connectionManagerActivities(
final GenerateInputActivity generateInputActivity,
Expand All @@ -79,17 +77,15 @@ public List<Object> connectionManagerActivities(
}

@Singleton
@Requires(property = "airbyte.worker.plane",
pattern = "(?i)^(?!data_plane).*")
@Requires(env = WorkerMode.CONTROL_PLANE)
@Named("discoverActivities")
public List<Object> discoverActivities(
final DiscoverCatalogActivity discoverCatalogActivity) {
return List.of(discoverCatalogActivity);
}

@Singleton
@Requires(property = "airbyte.worker.plane",
pattern = "(?i)^(?!data_plane).*")
@Requires(env = WorkerMode.CONTROL_PLANE)
@Named("specActivities")
public List<Object> specActivities(
final SpecActivity specActivity) {
Expand Down Expand Up @@ -154,8 +150,7 @@ public ActivityOptions shortActivityOptions(@Property(name = "airbyte.activity.m
}

@Singleton
@Requires(property = "airbyte.worker.plane",
pattern = "(?i)^(?!data_plane).*")
@Requires(env = WorkerMode.CONTROL_PLANE)
@Named("specActivityOptions")
public ActivityOptions specActivityOptions() {
return ActivityOptions.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
import com.auth0.jwt.algorithms.Algorithm;
import com.google.auth.oauth2.ServiceAccountCredentials;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.config.Configs.WorkerPlane;
import io.micronaut.context.BeanProvider;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Prototype;
import io.micronaut.context.annotation.Value;
import io.micronaut.context.env.Environment;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.io.FileInputStream;
Expand Down Expand Up @@ -58,10 +58,10 @@ public AirbyteApiClient airbyteApiClient(

@Singleton
@Named("internalApiScheme")
public String internalApiScheme(final WorkerPlane workerPlane) {
public String internalApiScheme(final Environment environment) {
// control plane workers communicate with the Airbyte API within their internal network, so https
// isn't needed
return WorkerPlane.CONTROL_PLANE.equals(workerPlane) ? "http" : "https";
return environment.getActiveNames().contains(WorkerMode.CONTROL_PLANE) ? "http" : "https";
}

/**
Expand All @@ -81,12 +81,12 @@ public String internalApiAuthToken(
@Value("${airbyte.control.plane.auth-endpoint}") final String controlPlaneAuthEndpoint,
@Value("${airbyte.data.plane.service-account.email}") final String dataPlaneServiceAccountEmail,
@Value("${airbyte.data.plane.service-account.credentials-path}") final String dataPlaneServiceAccountCredentialsPath,
final WorkerPlane workerPlane) {
if (WorkerPlane.CONTROL_PLANE.equals(workerPlane)) {
final Environment environment) {
if (environment.getActiveNames().contains(WorkerMode.CONTROL_PLANE)) {
// control plane workers communicate with the Airbyte API within their internal network, so a signed
// JWT isn't needed
return airbyteApiAuthHeaderValue;
} else if (WorkerPlane.DATA_PLANE.equals(workerPlane)) {
} else if (environment.getActiveNames().contains(WorkerMode.DATA_PLANE)) {
try {
final Date now = new Date();
final Date expTime = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(JWT_TTL_MINUTES));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.airbyte.config.Configs.SecretPersistenceType;
import io.airbyte.config.Configs.TrackingStrategy;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.Configs.WorkerPlane;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.StatePersistence;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
Expand Down Expand Up @@ -73,11 +72,6 @@ public WorkerEnvironment workerEnvironment(@Value("${airbyte.worker.env}") final
return convertToEnum(workerEnv, WorkerEnvironment::valueOf, WorkerEnvironment.DOCKER);
}

@Singleton
public WorkerPlane workerPlane(@Value("${airbyte.worker.plane}") final String workerPlane) {
return convertToEnum(workerPlane, WorkerPlane::valueOf, WorkerPlane.CONTROL_PLANE);
}

@Singleton
@Named("workspaceRoot")
public Path workspaceRoot(@Value("${airbyte.workspace.root}") final String workspaceRoot) {
Expand Down Expand Up @@ -106,8 +100,7 @@ public FeatureFlags featureFlags() {
}

@Singleton
@Requires(property = "airbyte.worker.plane",
pattern = "(?i)^(?!data_plane).*")
@Requires(env = WorkerMode.CONTROL_PLANE)
public JobNotifier jobNotifier(
final ConfigRepository configRepository,
final TrackingClient trackingClient,
Expand All @@ -121,8 +114,7 @@ public JobNotifier jobNotifier(
}

@Singleton
@Requires(property = "airbyte.worker.plane",
pattern = "(?i)^(?!data_plane).*")
@Requires(env = WorkerMode.CONTROL_PLANE)
public JobTracker jobTracker(
final ConfigRepository configRepository,
final JobPersistence jobPersistence,
Expand All @@ -131,8 +123,7 @@ public JobTracker jobTracker(
}

@Singleton
@Requires(property = "airbyte.worker.plane",
pattern = "(?i)^(?!data_plane).*")
@Requires(env = WorkerMode.CONTROL_PLANE)
public JsonSecretsProcessor jsonSecretsProcessor(final FeatureFlags featureFlags) {
return JsonSecretsProcessor.builder()
.maskSecrets(!featureFlags.exposeSecretsInExport())
Expand All @@ -141,15 +132,13 @@ public JsonSecretsProcessor jsonSecretsProcessor(final FeatureFlags featureFlags
}

@Singleton
@Requires(property = "airbyte.worker.plane",
pattern = "(?i)^(?!data_plane).*")
@Requires(env = WorkerMode.CONTROL_PLANE)
public WebUrlHelper webUrlHelper(@Value("${airbyte.web-app.url}") final String webAppUrl) {
return new WebUrlHelper(webAppUrl);
}

@Singleton
@Requires(property = "airbyte.worker.plane",
pattern = "(?i)^(?!data_plane).*")
@Requires(env = WorkerMode.CONTROL_PLANE)
public WorkspaceHelper workspaceHelper(
final ConfigRepository configRepository,
final JobPersistence jobPersistence) {
Expand Down
Loading