From 1f660c9d748d293e0ea306c2218cec0650382000 Mon Sep 17 00:00:00 2001 From: Jonathan Pearlin Date: Wed, 28 Sep 2022 11:02:50 -0400 Subject: [PATCH] Replace WorkerPlane with Micronaut environments (#17286) * Replace WorkerPlane with Micronaut environments * Set proper environment for airbyte-workers --- .env | 2 +- .env.dev | 2 +- .../main/java/io/airbyte/config/Configs.java | 14 -------- .../java/io/airbyte/config/EnvConfigs.java | 24 ------------- airbyte-workers/build.gradle | 2 +- .../workers/ApplicationInitializer.java | 10 +++--- .../workers/config/ActivityBeanFactory.java | 15 +++----- .../workers/config/ApiClientBeanFactory.java | 12 +++---- .../config/ApplicationBeanFactory.java | 21 +++-------- .../workers/config/DatabaseBeanFactory.java | 36 +++++++------------ .../config/JobErrorReportingBeanFactory.java | 9 ++--- .../config/ProcessFactoryBeanFactory.java | 18 ++++------ .../config/SecretPersistenceBeanFactory.java | 9 ++--- .../workers/config/TemporalBeanFactory.java | 9 ++--- .../WorkerConfigurationBeanFactory.java | 9 ++--- .../io/airbyte/workers/config/WorkerMode.java | 24 +++++++++++++ .../workers/helper/ConnectionHelper.java | 4 +-- .../workers/temporal/TemporalClient.java | 4 +-- .../CheckConnectionActivityImpl.java | 4 +-- .../catalog/DiscoverCatalogActivityImpl.java | 4 +-- .../AutoDisableConnectionActivityImpl.java | 4 +-- .../activities/ConfigFetchActivityImpl.java | 4 +-- .../ConnectionDeletionActivityImpl.java | 4 +-- .../activities/GenerateInputActivityImpl.java | 4 +-- ...obCreationAndStatusUpdateActivityImpl.java | 4 +-- .../activities/RecordMetricActivityImpl.java | 4 +-- .../activities/StreamResetActivityImpl.java | 4 +-- .../WorkflowConfigActivityImpl.java | 4 +-- .../temporal/spec/SpecActivityImpl.java | 4 +-- ...trol.yml => application-control-plane.yml} | 0 .../src/main/resources/application.yml | 1 - charts/airbyte/templates/env-configmap.yaml | 2 +- kube/overlays/dev-integration-test/.env | 2 +- kube/overlays/dev/.env | 2 +- .../overlays/stable-with-resource-limits/.env | 2 +- kube/overlays/stable/.env | 2 +- 36 files changed, 109 insertions(+), 170 deletions(-) create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/config/WorkerMode.java rename airbyte-workers/src/main/resources/{application-control.yml => application-control-plane.yml} (100%) diff --git a/.env b/.env index 5d7aa17eda4de..eb65d8fe9fc1b 100644 --- a/.env +++ b/.env @@ -81,7 +81,7 @@ LOG_LEVEL=INFO ### APPLICATIONS ### # Worker # -WORKERS_MICRONAUT_ENVIRONMENTS=control +WORKERS_MICRONAUT_ENVIRONMENTS=control-plane # Relevant to scaling. MAX_SYNC_WORKERS=5 MAX_SPEC_WORKERS=5 diff --git a/.env.dev b/.env.dev index b1780d7ee8c12..687fbe07b5dc3 100644 --- a/.env.dev +++ b/.env.dev @@ -25,7 +25,7 @@ API_URL=/api/v1/ INTERNAL_API_HOST=airbyte-server:8001 SYNC_JOB_MAX_ATTEMPTS=3 SYNC_JOB_MAX_TIMEOUT_DAYS=3 -WORKERS_MICRONAUT_ENVIRONMENTS=control +WORKERS_MICRONAUT_ENVIRONMENTS=control-plane # Sentry SENTRY_DSN="" diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java index 5dac93a102c5c..225a7fea9ed1c 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java @@ -568,15 +568,6 @@ public interface Configs { */ boolean shouldRunConnectionManagerWorkflows(); - /** - * Define if the worker is operating within Airbyte's Control Plane, or within an external Data - * Plane. - Workers in the Control Plane process tasks related to control-flow, like scheduling and - * routing, as well as data syncing tasks that are enqueued for the Control Plane's default task - * queue. - Workers in a Data Plane process only tasks related to data syncing that are specifically - * enqueued for that worker's particular Data Plane. - */ - WorkerPlane getWorkerPlane(); - // Worker - Control Plane configs /** @@ -717,9 +708,4 @@ enum SecretPersistenceType { VAULT } - enum WorkerPlane { - CONTROL_PLANE, - DATA_PLANE - } - } diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java index 10e4e751e0ab3..54100a059b108 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -131,7 +131,6 @@ public class EnvConfigs implements Configs { private static final String SHOULD_RUN_DISCOVER_WORKFLOWS = "SHOULD_RUN_DISCOVER_WORKFLOWS"; private static final String SHOULD_RUN_SYNC_WORKFLOWS = "SHOULD_RUN_SYNC_WORKFLOWS"; private static final String SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS = "SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS"; - private static final String WORKER_PLANE = "WORKER_PLANE"; // Worker - Control plane configs private static final String DEFAULT_DATA_SYNC_TASK_QUEUES = "SYNC"; // should match TemporalJobType.SYNC.name() @@ -229,8 +228,6 @@ public EnvConfigs(final Map envMap) { this.getAllEnvKeys = envMap::keySet; this.logConfigs = new LogConfigs(getLogConfiguration()); this.stateStorageCloudConfigs = getStateStorageConfiguration().orElse(null); - - validateSyncWorkflowConfigs(); } private Optional getLogConfiguration() { @@ -931,11 +928,6 @@ public boolean shouldRunConnectionManagerWorkflows() { return getEnvOrDefault(SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS, true); } - @Override - public WorkerPlane getWorkerPlane() { - return getEnvOrDefault(WORKER_PLANE, WorkerPlane.CONTROL_PLANE, s -> WorkerPlane.valueOf(s.toUpperCase())); - } - // Worker - Control plane @Override @@ -973,22 +965,6 @@ public String getDataPlaneServiceAccountEmail() { return getEnvOrDefault(DATA_PLANE_SERVICE_ACCOUNT_EMAIL, ""); } - /** - * Ensures the user hasn't configured themselves into a corner by making sure that the worker is set - * up to properly process sync workflows. With sensible defaults, it should be hard to fail this - * validation, but this provides a safety net regardless. - */ - private void validateSyncWorkflowConfigs() { - if (shouldRunSyncWorkflows()) { - if (getWorkerPlane().equals(WorkerPlane.DATA_PLANE) && getDataSyncTaskQueues().isEmpty()) { - throw new IllegalArgumentException(String.format( - "When %s is true, the worker must either be configured as a Control Plane worker, or %s must be non-empty.", - SHOULD_RUN_SYNC_WORKFLOWS, - DATA_SYNC_TASK_QUEUES)); - } - } - } - @Override public Set getTemporalWorkerPorts() { final var ports = getEnvOrDefault(TEMPORAL_WORKER_PORTS, ""); diff --git a/airbyte-workers/build.gradle b/airbyte-workers/build.gradle index 4f3303c94df7f..f477000ffcbae 100644 --- a/airbyte-workers/build.gradle +++ b/airbyte-workers/build.gradle @@ -114,7 +114,7 @@ run { environment 'AIRBYTE_ROLE', System.getenv('AIRBYTE_ROLE') environment 'AIRBYTE_VERSION', env.VERSION - environment 'MICRONAUT_ENVIRONMENTS', 'local,control' + environment 'MICRONAUT_ENVIRONMENTS', 'control-plane' } task cloudStorageIntegrationTest(type: Test) { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/ApplicationInitializer.java b/airbyte-workers/src/main/java/io/airbyte/workers/ApplicationInitializer.java index 338bf4472354f..c7c11da1a3f8b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/ApplicationInitializer.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/ApplicationInitializer.java @@ -12,7 +12,6 @@ import io.airbyte.config.Configs.DeploymentMode; import io.airbyte.config.Configs.TrackingStrategy; import io.airbyte.config.Configs.WorkerEnvironment; -import io.airbyte.config.Configs.WorkerPlane; import io.airbyte.config.MaxWorkersConfig; import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.config.helpers.LogConfigs; @@ -23,6 +22,7 @@ import io.airbyte.metrics.lib.MetricClientFactory; import io.airbyte.metrics.lib.MetricEmittingApps; import io.airbyte.persistence.job.JobPersistence; +import io.airbyte.workers.config.WorkerMode; import io.airbyte.workers.process.KubePortManagerSingleton; import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflowImpl; import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflowImpl; @@ -138,21 +138,21 @@ public class ApplicationInitializer implements ApplicationEventListener checkConnectionActivities( final CheckConnectionActivity checkConnectionActivity) { @@ -52,8 +51,7 @@ public List checkConnectionActivities( } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("connectionManagerActivities") public List connectionManagerActivities( final GenerateInputActivity generateInputActivity, @@ -79,8 +77,7 @@ public List connectionManagerActivities( } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("discoverActivities") public List discoverActivities( final DiscoverCatalogActivity discoverCatalogActivity) { @@ -88,8 +85,7 @@ public List discoverActivities( } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("specActivities") public List specActivities( final SpecActivity specActivity) { @@ -154,8 +150,7 @@ public ActivityOptions shortActivityOptions(@Property(name = "airbyte.activity.m } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("specActivityOptions") public ActivityOptions specActivityOptions() { return ActivityOptions.newBuilder() diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java index ce4cee032610d..4d7e319b0f4de 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java @@ -9,11 +9,11 @@ import com.auth0.jwt.algorithms.Algorithm; import com.google.auth.oauth2.ServiceAccountCredentials; import io.airbyte.api.client.AirbyteApiClient; -import io.airbyte.config.Configs.WorkerPlane; import io.micronaut.context.BeanProvider; import io.micronaut.context.annotation.Factory; import io.micronaut.context.annotation.Prototype; import io.micronaut.context.annotation.Value; +import io.micronaut.context.env.Environment; import jakarta.inject.Named; import jakarta.inject.Singleton; import java.io.FileInputStream; @@ -58,10 +58,10 @@ public AirbyteApiClient airbyteApiClient( @Singleton @Named("internalApiScheme") - public String internalApiScheme(final WorkerPlane workerPlane) { + public String internalApiScheme(final Environment environment) { // control plane workers communicate with the Airbyte API within their internal network, so https // isn't needed - return WorkerPlane.CONTROL_PLANE.equals(workerPlane) ? "http" : "https"; + return environment.getActiveNames().contains(WorkerMode.CONTROL_PLANE) ? "http" : "https"; } /** @@ -81,12 +81,12 @@ public String internalApiAuthToken( @Value("${airbyte.control.plane.auth-endpoint}") final String controlPlaneAuthEndpoint, @Value("${airbyte.data.plane.service-account.email}") final String dataPlaneServiceAccountEmail, @Value("${airbyte.data.plane.service-account.credentials-path}") final String dataPlaneServiceAccountCredentialsPath, - final WorkerPlane workerPlane) { - if (WorkerPlane.CONTROL_PLANE.equals(workerPlane)) { + final Environment environment) { + if (environment.getActiveNames().contains(WorkerMode.CONTROL_PLANE)) { // control plane workers communicate with the Airbyte API within their internal network, so a signed // JWT isn't needed return airbyteApiAuthHeaderValue; - } else if (WorkerPlane.DATA_PLANE.equals(workerPlane)) { + } else if (environment.getActiveNames().contains(WorkerMode.DATA_PLANE)) { try { final Date now = new Date(); final Date expTime = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(JWT_TTL_MINUTES)); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ApplicationBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ApplicationBeanFactory.java index 91a5341f84a41..c907c93bd875a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ApplicationBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ApplicationBeanFactory.java @@ -13,7 +13,6 @@ import io.airbyte.config.Configs.SecretPersistenceType; import io.airbyte.config.Configs.TrackingStrategy; import io.airbyte.config.Configs.WorkerEnvironment; -import io.airbyte.config.Configs.WorkerPlane; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.StatePersistence; import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor; @@ -73,11 +72,6 @@ public WorkerEnvironment workerEnvironment(@Value("${airbyte.worker.env}") final return convertToEnum(workerEnv, WorkerEnvironment::valueOf, WorkerEnvironment.DOCKER); } - @Singleton - public WorkerPlane workerPlane(@Value("${airbyte.worker.plane}") final String workerPlane) { - return convertToEnum(workerPlane, WorkerPlane::valueOf, WorkerPlane.CONTROL_PLANE); - } - @Singleton @Named("workspaceRoot") public Path workspaceRoot(@Value("${airbyte.workspace.root}") final String workspaceRoot) { @@ -106,8 +100,7 @@ public FeatureFlags featureFlags() { } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) public JobNotifier jobNotifier( final ConfigRepository configRepository, final TrackingClient trackingClient, @@ -121,8 +114,7 @@ public JobNotifier jobNotifier( } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) public JobTracker jobTracker( final ConfigRepository configRepository, final JobPersistence jobPersistence, @@ -131,8 +123,7 @@ public JobTracker jobTracker( } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) public JsonSecretsProcessor jsonSecretsProcessor(final FeatureFlags featureFlags) { return JsonSecretsProcessor.builder() .maskSecrets(!featureFlags.exposeSecretsInExport()) @@ -141,15 +132,13 @@ public JsonSecretsProcessor jsonSecretsProcessor(final FeatureFlags featureFlags } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) public WebUrlHelper webUrlHelper(@Value("${airbyte.web-app.url}") final String webAppUrl) { return new WebUrlHelper(webAppUrl); } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) public WorkspaceHelper workspaceHelper( final ConfigRepository configRepository, final JobPersistence jobPersistence) { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/DatabaseBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/DatabaseBeanFactory.java index 844fc4188b60a..9403a1c21014b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/DatabaseBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/DatabaseBeanFactory.java @@ -42,24 +42,21 @@ public class DatabaseBeanFactory { private static final String INSTALLED_BY = "WorkerApp"; @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("configDatabase") public Database configDatabase(@Named("config") final DSLContext dslContext) throws IOException { return new Database(dslContext); } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("jobsDatabase") public Database jobsDatabase(@Named("jobs") final DSLContext dslContext) throws IOException { return new Database(dslContext); } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("configFlyway") public Flyway configFlyway(@Named("config") final FlywayConfigurationProperties configFlywayConfigurationProperties, @Named("config") final DataSource configDataSource, @@ -75,8 +72,7 @@ public Flyway configFlyway(@Named("config") final FlywayConfigurationProperties } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("jobsFlyway") public Flyway jobsFlyway(@Named("jobs") final FlywayConfigurationProperties jobsFlywayConfigurationProperties, @Named("jobs") final DataSource jobsDataSource, @@ -92,45 +88,39 @@ public Flyway jobsFlyway(@Named("jobs") final FlywayConfigurationProperties jobs } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) public ConfigPersistence configPersistence(@Named("configDatabase") final Database configDatabase, final JsonSecretsProcessor jsonSecretsProcessor) { return DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor); } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) public ConfigRepository configRepository(@Named("configPersistence") final ConfigPersistence configPersistence, @Named("configDatabase") final Database configDatabase) { return new ConfigRepository(configPersistence, configDatabase); } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) public JobPersistence jobPersistence(@Named("jobsDatabase") final Database jobDatabase) { return new DefaultJobPersistence(jobDatabase); } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) public StatePersistence statePersistence(@Named("configDatabase") final Database configDatabase) { return new StatePersistence(configDatabase); } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) public StreamResetPersistence streamResetPersistence(@Named("configDatabase") final Database configDatabase) { return new StreamResetPersistence(configDatabase); } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("configsDatabaseMigrationCheck") public DatabaseMigrationCheck configsDatabaseMigrationCheck(@Named("config") final DSLContext dslContext, @Named("configFlyway") final Flyway configsFlyway, @@ -143,8 +133,7 @@ public DatabaseMigrationCheck configsDatabaseMigrationCheck(@Named("config") fin } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("jobsDatabaseMigrationCheck") public DatabaseMigrationCheck jobsDatabaseMigrationCheck(@Named("jobs") final DSLContext dslContext, @Named("jobsFlyway") final Flyway jobsFlyway, @@ -156,8 +145,7 @@ public DatabaseMigrationCheck jobsDatabaseMigrationCheck(@Named("jobs") final DS } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("jobsDatabaseAvailabilityCheck") public JobsDatabaseAvailabilityCheck jobsDatabaseAvailabilityCheck(@Named("jobs") final DSLContext dslContext) { return new JobsDatabaseAvailabilityCheck(dslContext, DatabaseConstants.DEFAULT_ASSERT_DATABASE_TIMEOUT_MS); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/JobErrorReportingBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/JobErrorReportingBeanFactory.java index 9faf2acf27904..f5ba3350d35a3 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/JobErrorReportingBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/JobErrorReportingBeanFactory.java @@ -30,8 +30,7 @@ public class JobErrorReportingBeanFactory { @Singleton @Requires(property = "airbyte.worker.job.error-reporting.strategy", pattern = "(?i)^sentry$") - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("jobErrorReportingClient") public JobErrorReportingClient sentryJobErrorReportingClient( @Value("${airbyte.worker.job.error-reporting.sentry.dsn}") final String sentryDsn) { @@ -41,16 +40,14 @@ public JobErrorReportingClient sentryJobErrorReportingClient( @Singleton @Requires(property = "airbyte.worker.job.error-reporting.strategy", pattern = "(?i)^logging$") - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("jobErrorReportingClient") public JobErrorReportingClient loggingJobErrorReportingClient() { return new LoggingJobErrorReportingClient(); } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) public JobErrorReporter jobErrorReporter( @Value("${airbyte.version}") final String airbyteVersion, final ConfigRepository configRepository, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ProcessFactoryBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ProcessFactoryBeanFactory.java index 5bb15020bfb46..43bb43a04e069 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ProcessFactoryBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ProcessFactoryBeanFactory.java @@ -30,8 +30,7 @@ public class ProcessFactoryBeanFactory { @Singleton @Requires(property = "airbyte.worker.env", pattern = "(?i)^(?!kubernetes$).*") - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("checkProcessFactory") public ProcessFactory checkDockerProcessFactory( @Named("checkWorkerConfigs") final WorkerConfigs workerConfigs, @@ -52,8 +51,7 @@ public ProcessFactory checkDockerProcessFactory( @Singleton @Requires(property = "airbyte.worker.env", pattern = "(?i)^kubernetes$") - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("checkProcessFactory") public ProcessFactory checkKubernetesProcessFactory( @Named("checkWorkerConfigs") final WorkerConfigs workerConfigs, @@ -102,8 +100,7 @@ public ProcessFactory defaultKubernetesProcessFactory( @Singleton @Requires(property = "airbyte.worker.env", pattern = "(?i)^(?!kubernetes$).*") - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("discoverProcessFactory") public ProcessFactory discoverDockerProcessFactory( @Named("discoverWorkerConfigs") final WorkerConfigs workerConfigs, @@ -124,8 +121,7 @@ public ProcessFactory discoverDockerProcessFactory( @Singleton @Requires(property = "airbyte.worker.env", pattern = "(?i)^kubernetes$") - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("discoverProcessFactory") public ProcessFactory discoverKubernetesProcessFactory( @Named("discoverWorkerConfigs") final WorkerConfigs workerConfigs, @@ -174,8 +170,7 @@ public ProcessFactory replicationKubernetesProcessFactory( @Singleton @Requires(property = "airbyte.worker.env", pattern = "(?i)^(?!kubernetes$).*") - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("specProcessFactory") public ProcessFactory specDockerProcessFactory( @Named("specWorkerConfigs") final WorkerConfigs workerConfigs, @@ -196,8 +191,7 @@ public ProcessFactory specDockerProcessFactory( @Singleton @Requires(property = "airbyte.worker.env", pattern = "(?i)^kubernetes$") - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("specProcessFactory") public ProcessFactory specKubernetesProcessFactory( @Named("specWorkerConfigs") final WorkerConfigs workerConfigs, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/SecretPersistenceBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/SecretPersistenceBeanFactory.java index fb42c04e7eca3..1b15ce5b2c4b3 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/SecretPersistenceBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/SecretPersistenceBeanFactory.java @@ -31,8 +31,7 @@ public class SecretPersistenceBeanFactory { pattern = "(?i)^(?!google_secret_manager).*") @Requires(property = "airbyte.secret.persistence", pattern = "(?i)^(?!vault).*") - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("secretPersistence") public SecretPersistence defaultSecretPersistence(@Named("configDatabase") final Database configDatabase) { return localTestingSecretPersistence(configDatabase); @@ -41,8 +40,7 @@ public SecretPersistence defaultSecretPersistence(@Named("configDatabase") final @Singleton @Requires(property = "airbyte.secret.persistence", pattern = "(?i)^testing_config_db_table$") - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("secretPersistence") public SecretPersistence localTestingSecretPersistence(@Named("configDatabase") final Database configDatabase) { return new LocalTestingSecretPersistence(configDatabase); @@ -60,8 +58,7 @@ public SecretPersistence googleSecretPersistence(@Value("${airbyte.secret.store. @Singleton @Requires(property = "airbyte.secret.persistence", pattern = "(?i)^vault$") - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("secretPersistence") public SecretPersistence vaultSecretPersistence(@Value("${airbyte.secret.store.vault.address}") final String address, @Value("${airbyte.secret.store.vault.prefix}") final String prefix, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/TemporalBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/TemporalBeanFactory.java index d11492ce31300..b09c5c75aaea6 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/TemporalBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/TemporalBeanFactory.java @@ -32,15 +32,13 @@ public class TemporalBeanFactory { @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) public TrackingClient trackingClient() { return TrackingClientSingleton.get(); } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) public SyncJobFactory jobFactory( final ConfigRepository configRepository, @Property(name = "airbyte.connector.specific-resource-defaults-enabled", @@ -55,8 +53,7 @@ public SyncJobFactory jobFactory( } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) public TemporalWorkerRunFactory temporalWorkerRunFactory( @Value("${airbyte.version}") final String airbyteVersion, final FeatureFlags featureFlags, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/WorkerConfigurationBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/WorkerConfigurationBeanFactory.java index f14fa91cc8c69..1e476a96948fa 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/WorkerConfigurationBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/WorkerConfigurationBeanFactory.java @@ -195,8 +195,7 @@ public ResourceRequirements replicationResourceRequirements( } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("checkWorkerConfigs") public WorkerConfigs checkWorkerConfigs( final WorkerEnvironment workerEnvironment, @@ -257,8 +256,7 @@ public WorkerConfigs defaultWorkerConfigs( } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("discoverWorkerConfigs") public WorkerConfigs discoverWorkerConfigs( final WorkerEnvironment workerEnvironment, @@ -319,8 +317,7 @@ public WorkerConfigs replicationWorkerConfigs( } @Singleton - @Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") + @Requires(env = WorkerMode.CONTROL_PLANE) @Named("specWorkerConfigs") public WorkerConfigs specWorkerConfigs( final WorkerEnvironment workerEnvironment, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/WorkerMode.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/WorkerMode.java new file mode 100644 index 0000000000000..66e164b2fbca6 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/WorkerMode.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.config; + +/** + * Defines the different execution modes for the workers application. + */ +public final class WorkerMode { + + private WorkerMode() {} + + /** + * Control plane environment/mode. + */ + public static final String CONTROL_PLANE = "control-plane"; + + /** + * Data plane environment/mode. + */ + public static final String DATA_PLANE = "data-plane"; + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/helper/ConnectionHelper.java b/airbyte-workers/src/main/java/io/airbyte/workers/helper/ConnectionHelper.java index 1714d90e0cc29..3232b25abbecf 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/helper/ConnectionHelper.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/helper/ConnectionHelper.java @@ -17,6 +17,7 @@ import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.persistence.job.WorkspaceHelper; import io.airbyte.validation.json.JsonValidationException; +import io.airbyte.workers.config.WorkerMode; import io.micronaut.context.annotation.Requires; import jakarta.inject.Singleton; import java.io.IOException; @@ -27,8 +28,7 @@ // todo (cgardens) - we are not getting any value out of instantiating this class. we should just // use it as statics. not doing it now, because already in the middle of another refactor. @Singleton -@Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") +@Requires(env = WorkerMode.CONTROL_PLANE) public class ConnectionHelper { private final ConfigRepository configRepository; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java index d4f2ddb2141fa..8d6e7514dfe17 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java @@ -25,6 +25,7 @@ import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.protocol.models.StreamDescriptor; import io.airbyte.workers.WorkerUtils; +import io.airbyte.workers.config.WorkerMode; import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflow; import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflow; import io.airbyte.workers.temporal.exception.DeletedWorkflowException; @@ -65,8 +66,7 @@ @Slf4j @Singleton -@Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") +@Requires(env = WorkerMode.CONTROL_PLANE) public class TemporalClient { /** diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java index 8eff5717a3517..ee19ef5ad5535 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java @@ -18,6 +18,7 @@ import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import io.airbyte.workers.Worker; import io.airbyte.workers.WorkerConfigs; +import io.airbyte.workers.config.WorkerMode; import io.airbyte.workers.general.DefaultCheckConnectionWorker; import io.airbyte.workers.process.AirbyteIntegrationLauncher; import io.airbyte.workers.process.IntegrationLauncher; @@ -32,8 +33,7 @@ import java.nio.file.Path; @Singleton -@Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") +@Requires(env = WorkerMode.CONTROL_PLANE) public class CheckConnectionActivityImpl implements CheckConnectionActivity { private final WorkerConfigs workerConfigs; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java index aaf6f3aff0ab7..3721bd97f90e8 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java @@ -17,6 +17,7 @@ import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.Worker; import io.airbyte.workers.WorkerConfigs; +import io.airbyte.workers.config.WorkerMode; import io.airbyte.workers.general.DefaultDiscoverCatalogWorker; import io.airbyte.workers.internal.AirbyteStreamFactory; import io.airbyte.workers.internal.DefaultAirbyteStreamFactory; @@ -34,8 +35,7 @@ import lombok.extern.slf4j.Slf4j; @Singleton -@Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") +@Requires(env = WorkerMode.CONTROL_PLANE) @Slf4j public class DiscoverCatalogActivityImpl implements DiscoverCatalogActivity { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityImpl.java index 502fa89ccb2f5..67809505267f9 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityImpl.java @@ -19,6 +19,7 @@ import io.airbyte.persistence.job.models.JobStatus; import io.airbyte.persistence.job.models.JobWithStatusAndTimestamp; import io.airbyte.validation.json.JsonValidationException; +import io.airbyte.workers.config.WorkerMode; import io.airbyte.workers.temporal.exception.RetryableException; import io.micronaut.context.annotation.Requires; import io.micronaut.context.annotation.Value; @@ -29,8 +30,7 @@ import java.util.concurrent.TimeUnit; @Singleton -@Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") +@Requires(env = WorkerMode.CONTROL_PLANE) public class AutoDisableConnectionActivityImpl implements AutoDisableConnectionActivity { private final ConfigRepository configRepository; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java index d6345f1287476..c6909a4c28753 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java @@ -14,6 +14,7 @@ import io.airbyte.persistence.job.JobPersistence; import io.airbyte.persistence.job.models.Job; import io.airbyte.validation.json.JsonValidationException; +import io.airbyte.workers.config.WorkerMode; import io.airbyte.workers.temporal.exception.RetryableException; import io.micronaut.context.annotation.Requires; import io.micronaut.context.annotation.Value; @@ -34,8 +35,7 @@ @Slf4j @Singleton -@Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") +@Requires(env = WorkerMode.CONTROL_PLANE) public class ConfigFetchActivityImpl implements ConfigFetchActivity { private final static long MS_PER_SECOND = 1000L; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConnectionDeletionActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConnectionDeletionActivityImpl.java index 593b3dec9b67c..5b138443b74d0 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConnectionDeletionActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConnectionDeletionActivityImpl.java @@ -6,6 +6,7 @@ import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.validation.json.JsonValidationException; +import io.airbyte.workers.config.WorkerMode; import io.airbyte.workers.helper.ConnectionHelper; import io.airbyte.workers.temporal.exception.RetryableException; import io.micronaut.context.annotation.Requires; @@ -13,8 +14,7 @@ import java.io.IOException; @Singleton -@Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") +@Requires(env = WorkerMode.CONTROL_PLANE) public class ConnectionDeletionActivityImpl implements ConnectionDeletionActivity { private final ConnectionHelper connectionHelper; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java index 079fb8b34ad1c..503ba9bc8e023 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java @@ -16,14 +16,14 @@ import io.airbyte.persistence.job.models.Job; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.WorkerConstants; +import io.airbyte.workers.config.WorkerMode; import io.airbyte.workers.temporal.exception.RetryableException; import io.micronaut.context.annotation.Requires; import jakarta.inject.Singleton; import java.util.List; @Singleton -@Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") +@Requires(env = WorkerMode.CONTROL_PLANE) public class GenerateInputActivityImpl implements GenerateInputActivity { private final JobPersistence jobPersistence; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index ecf64d808fd33..354b563638677 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -41,6 +41,7 @@ import io.airbyte.protocol.models.StreamDescriptor; import io.airbyte.validation.json.JsonValidationException; import io.airbyte.workers.JobStatus; +import io.airbyte.workers.config.WorkerMode; import io.airbyte.workers.helper.FailureHelper; import io.airbyte.workers.run.TemporalWorkerRunFactory; import io.airbyte.workers.run.WorkerRun; @@ -56,8 +57,7 @@ @Slf4j @Singleton -@Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") +@Requires(env = WorkerMode.CONTROL_PLANE) public class JobCreationAndStatusUpdateActivityImpl implements JobCreationAndStatusUpdateActivity { private final SyncJobFactory jobFactory; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java index 3447f37c60a73..870a96b0e9381 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java @@ -8,6 +8,7 @@ import io.airbyte.metrics.lib.MetricAttribute; import io.airbyte.metrics.lib.MetricClient; import io.airbyte.metrics.lib.MetricTags; +import io.airbyte.workers.config.WorkerMode; import io.micronaut.context.annotation.Requires; import jakarta.inject.Singleton; import java.util.ArrayList; @@ -22,8 +23,7 @@ */ @Slf4j @Singleton -@Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") +@Requires(env = WorkerMode.CONTROL_PLANE) public class RecordMetricActivityImpl implements RecordMetricActivity { private final MetricClient metricClient; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/StreamResetActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/StreamResetActivityImpl.java index 90fa9e9341a6a..e4b2014dce23e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/StreamResetActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/StreamResetActivityImpl.java @@ -4,6 +4,7 @@ package io.airbyte.workers.temporal.scheduling.activities; +import io.airbyte.workers.config.WorkerMode; import io.airbyte.workers.temporal.StreamResetRecordsHelper; import io.micronaut.context.annotation.Requires; import jakarta.inject.Singleton; @@ -11,8 +12,7 @@ @Slf4j @Singleton -@Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") +@Requires(env = WorkerMode.CONTROL_PLANE) public class StreamResetActivityImpl implements StreamResetActivity { private final StreamResetRecordsHelper streamResetRecordsHelper; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/WorkflowConfigActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/WorkflowConfigActivityImpl.java index 6806ed1052f7b..558868f04a4b1 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/WorkflowConfigActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/WorkflowConfigActivityImpl.java @@ -4,6 +4,7 @@ package io.airbyte.workers.temporal.scheduling.activities; +import io.airbyte.workers.config.WorkerMode; import io.micronaut.context.annotation.Property; import io.micronaut.context.annotation.Requires; import jakarta.inject.Singleton; @@ -16,8 +17,7 @@ */ @Slf4j @Singleton -@Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") +@Requires(env = WorkerMode.CONTROL_PLANE) public class WorkflowConfigActivityImpl implements WorkflowConfigActivity { private final Long workflowRestartDelaySeconds; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java index 717d18a781f13..0f3eb998bf96f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java @@ -15,6 +15,7 @@ import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.Worker; import io.airbyte.workers.WorkerConfigs; +import io.airbyte.workers.config.WorkerMode; import io.airbyte.workers.general.DefaultGetSpecWorker; import io.airbyte.workers.process.AirbyteIntegrationLauncher; import io.airbyte.workers.process.IntegrationLauncher; @@ -30,8 +31,7 @@ import java.util.function.Supplier; @Singleton -@Requires(property = "airbyte.worker.plane", - pattern = "(?i)^(?!data_plane).*") +@Requires(env = WorkerMode.CONTROL_PLANE) public class SpecActivityImpl implements SpecActivity { private final WorkerConfigs workerConfigs; diff --git a/airbyte-workers/src/main/resources/application-control.yml b/airbyte-workers/src/main/resources/application-control-plane.yml similarity index 100% rename from airbyte-workers/src/main/resources/application-control.yml rename to airbyte-workers/src/main/resources/application-control-plane.yml diff --git a/airbyte-workers/src/main/resources/application.yml b/airbyte-workers/src/main/resources/application.yml index 3b5af0c7a6d8f..54b95f0b2f3bd 100644 --- a/airbyte-workers/src/main/resources/application.yml +++ b/airbyte-workers/src/main/resources/application.yml @@ -149,7 +149,6 @@ airbyte: memory: limit: ${NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT:} request: ${NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST:} - plane: ${WORKER_PLANE:CONTROL_PLANE} replication: orchestrator: cpu: diff --git a/charts/airbyte/templates/env-configmap.yaml b/charts/airbyte/templates/env-configmap.yaml index 992f8beb4d022..b9945de80ed21 100644 --- a/charts/airbyte/templates/env-configmap.yaml +++ b/charts/airbyte/templates/env-configmap.yaml @@ -63,7 +63,7 @@ data: USE_STREAM_CAPABLE_STATE: "true" CONTAINER_ORCHESTRATOR_ENABLED: {{ .Values.worker.containerOrchestrator.enabled | quote }} CONTAINER_ORCHESTRATOR_IMAGE: {{ .Values.worker.containerOrchestrator.image | quote }} - WORKERS_MICRONAUT_ENVIRONMENTS: "control" + WORKERS_MICRONAUT_ENVIRONMENTS: "control-plane" WORKER_LOGS_STORAGE_TYPE: {{ .Values.global.logs.storage.type | quote }} WORKER_STATE_STORAGE_TYPE: {{ .Values.global.state.storage.type | quote }} {{- end }} diff --git a/kube/overlays/dev-integration-test/.env b/kube/overlays/dev-integration-test/.env index aeee113ab00b1..4f3226e4eb0f5 100644 --- a/kube/overlays/dev-integration-test/.env +++ b/kube/overlays/dev-integration-test/.env @@ -58,7 +58,7 @@ NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT= NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST= # Worker # -WORKERS_MICRONAUT_ENVIRONMENTS=control +WORKERS_MICRONAUT_ENVIRONMENTS=control-plane # Worker pod tolerations, annotations and node selectors JOB_KUBE_TOLERATIONS= diff --git a/kube/overlays/dev/.env b/kube/overlays/dev/.env index a19795e0988c5..0ddf0bb2800b3 100644 --- a/kube/overlays/dev/.env +++ b/kube/overlays/dev/.env @@ -60,7 +60,7 @@ NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT= NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST= # Worker # -WORKERS_MICRONAUT_ENVIRONMENTS=control +WORKERS_MICRONAUT_ENVIRONMENTS=control-plane # Worker pod tolerations, annotations and node selectors JOB_KUBE_TOLERATIONS= diff --git a/kube/overlays/stable-with-resource-limits/.env b/kube/overlays/stable-with-resource-limits/.env index 99c05c7691ca5..f4152cdb52614 100644 --- a/kube/overlays/stable-with-resource-limits/.env +++ b/kube/overlays/stable-with-resource-limits/.env @@ -55,7 +55,7 @@ JOB_MAIN_CONTAINER_MEMORY_REQUEST= JOB_MAIN_CONTAINER_MEMORY_LIMIT= # Worker # -WORKERS_MICRONAUT_ENVIRONMENTS=control +WORKERS_MICRONAUT_ENVIRONMENTS=control-plane # Worker pod tolerations, annotations and node selectors JOB_KUBE_TOLERATIONS= diff --git a/kube/overlays/stable/.env b/kube/overlays/stable/.env index cef9f7ab537b8..136529e1cb37f 100644 --- a/kube/overlays/stable/.env +++ b/kube/overlays/stable/.env @@ -60,7 +60,7 @@ NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT= NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST= # Worker # -WORKERS_MICRONAUT_ENVIRONMENTS=control +WORKERS_MICRONAUT_ENVIRONMENTS=control-plane # Worker pod tolerations, annotations and node selectors JOB_KUBE_TOLERATIONS=