From 90f011c039e746eb68941f24904c75fcc415c224 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Wed, 12 Jan 2022 13:39:53 -0800 Subject: [PATCH 1/2] Only migrate active and disable connection --- .../io/airbyte/scheduler/app/SchedulerApp.java | 18 ++++++++---------- .../main/java/io/airbyte/server/ServerApp.java | 8 +++++--- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java index b807255cf9017..cc9f4653c6125 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java @@ -56,13 +56,11 @@ import org.slf4j.MDC; /** - * The SchedulerApp is responsible for finding new scheduled jobs that need to be run and to launch - * them. The current implementation uses two thread pools to do so. One pool is responsible for all - * job launching operations. The other pool is responsible for clean up operations. + * The SchedulerApp is responsible for finding new scheduled jobs that need to be run and to launch them. The current implementation uses two thread + * pools to do so. One pool is responsible for all job launching operations. The other pool is responsible for clean up operations. *

- * Operations can have thread pools under the hood. An important thread pool to note is that the job - * submitter thread pool. This pool does the work of submitting jobs to temporal - the size of this - * pool determines the number of concurrent jobs that can be run. This is controlled via the + * Operations can have thread pools under the hood. An important thread pool to note is that the job submitter thread pool. This pool does the work of + * submitting jobs to temporal - the size of this pool determines the number of concurrent jobs that can be run. This is controlled via the * SUBMITTER_NUM_THREADS variable of EnvConfigs. */ public class SchedulerApp { @@ -140,7 +138,7 @@ public void start() throws IOException { // anymore. cleanupZombies(jobPersistence, jobNotifier); - LOGGER.error("Start running the old scheduler"); + LOGGER.info("Start running the old scheduler"); scheduleJobsPool.scheduleWithFixedDelay( () -> { MDC.setContextMap(mdc); @@ -234,13 +232,13 @@ public static void main(final String[] args) throws IOException, InterruptedExce configs.getDatabaseUser(), configs.getDatabasePassword(), configs.getDatabaseUrl()) - .getInitialized(); + .getInitialized(); final Database configDatabase = new ConfigsDatabaseInstance( configs.getConfigDatabaseUser(), configs.getConfigDatabasePassword(), configs.getConfigDatabaseUrl()) - .getInitialized(); + .getInitialized(); final ConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase).withValidation(); final Optional secretPersistence = SecretPersistence.getLongLived(configs); final Optional ephemeralSecretPersistence = SecretPersistence.getEphemeral(configs); @@ -283,7 +281,7 @@ public static void main(final String[] args) throws IOException, InterruptedExce Integer.parseInt(configs.getSubmitterNumThreads()), configs.getSyncJobMaxAttempts(), configs.getAirbyteVersionOrWarning(), configs.getWorkerEnvironment(), configs.getLogConfigs()) - .start(); + .start(); } } diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 8daabdc64fd26..dcbf9fa290f43 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -13,6 +13,7 @@ import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.Configs; import io.airbyte.config.EnvConfigs; +import io.airbyte.config.StandardSync.Status; import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.config.init.YamlSeedConfigPersistence; import io.airbyte.config.persistence.ConfigNotFoundException; @@ -235,14 +236,15 @@ private static void migrateExistingConnection(final ConfigRepository configRepos throws JsonValidationException, ConfigNotFoundException, IOException { LOGGER.info("Start migration to the new scheduler..."); final Set connectionIds = - configRepository.listStandardSyncs().stream().map(standardSync -> standardSync.getConnectionId()).collect(Collectors.toSet()); + configRepository.listStandardSyncs().stream() + .filter(standardSync -> standardSync.getStatus() == Status.ACTIVE || standardSync.getStatus() == Status.INACTIVE) + .map(standardSync -> standardSync.getConnectionId()).collect(Collectors.toSet()); temporalWorkerRunFactory.migrateSyncIfNeeded(connectionIds); LOGGER.info("Done migrating to the new scheduler..."); } /** - * Copy paste from {@link io.airbyte.scheduler.app.SchedulerApp} which will be removed in a near - * future + * Copy paste from {@link io.airbyte.scheduler.app.SchedulerApp} which will be removed in a near future * * @param jobPersistence * @param jobNotifier From 1fb68de6425e43df0e82194892811aafcf1cbf09 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Wed, 12 Jan 2022 13:45:28 -0800 Subject: [PATCH 2/2] format --- .../io/airbyte/scheduler/app/SchedulerApp.java | 16 +++++++++------- .../main/java/io/airbyte/server/ServerApp.java | 3 ++- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java index cc9f4653c6125..00d489b636711 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java @@ -56,11 +56,13 @@ import org.slf4j.MDC; /** - * The SchedulerApp is responsible for finding new scheduled jobs that need to be run and to launch them. The current implementation uses two thread - * pools to do so. One pool is responsible for all job launching operations. The other pool is responsible for clean up operations. + * The SchedulerApp is responsible for finding new scheduled jobs that need to be run and to launch + * them. The current implementation uses two thread pools to do so. One pool is responsible for all + * job launching operations. The other pool is responsible for clean up operations. *

- * Operations can have thread pools under the hood. An important thread pool to note is that the job submitter thread pool. This pool does the work of - * submitting jobs to temporal - the size of this pool determines the number of concurrent jobs that can be run. This is controlled via the + * Operations can have thread pools under the hood. An important thread pool to note is that the job + * submitter thread pool. This pool does the work of submitting jobs to temporal - the size of this + * pool determines the number of concurrent jobs that can be run. This is controlled via the * SUBMITTER_NUM_THREADS variable of EnvConfigs. */ public class SchedulerApp { @@ -232,13 +234,13 @@ public static void main(final String[] args) throws IOException, InterruptedExce configs.getDatabaseUser(), configs.getDatabasePassword(), configs.getDatabaseUrl()) - .getInitialized(); + .getInitialized(); final Database configDatabase = new ConfigsDatabaseInstance( configs.getConfigDatabaseUser(), configs.getConfigDatabasePassword(), configs.getConfigDatabaseUrl()) - .getInitialized(); + .getInitialized(); final ConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase).withValidation(); final Optional secretPersistence = SecretPersistence.getLongLived(configs); final Optional ephemeralSecretPersistence = SecretPersistence.getEphemeral(configs); @@ -281,7 +283,7 @@ public static void main(final String[] args) throws IOException, InterruptedExce Integer.parseInt(configs.getSubmitterNumThreads()), configs.getSyncJobMaxAttempts(), configs.getAirbyteVersionOrWarning(), configs.getWorkerEnvironment(), configs.getLogConfigs()) - .start(); + .start(); } } diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index dcbf9fa290f43..ba13a25ec28c8 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -244,7 +244,8 @@ private static void migrateExistingConnection(final ConfigRepository configRepos } /** - * Copy paste from {@link io.airbyte.scheduler.app.SchedulerApp} which will be removed in a near future + * Copy paste from {@link io.airbyte.scheduler.app.SchedulerApp} which will be removed in a near + * future * * @param jobPersistence * @param jobNotifier