diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistenceBuilder.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistenceBuilder.java
new file mode 100644
index 0000000000000..a1d3222c1d781
--- /dev/null
+++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistenceBuilder.java
@@ -0,0 +1,161 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 Airbyte
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package io.airbyte.config.persistence;
+
+import static io.airbyte.config.persistence.AirbyteConfigsTable.AIRBYTE_CONFIGS_TABLE_SCHEMA;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.airbyte.commons.resources.MoreResources;
+import io.airbyte.config.Configs;
+import io.airbyte.db.Database;
+import io.airbyte.db.Databases;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.function.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * By default, this factory returns a database config persistence. it can still return a file system
+ * config persistence for testing purpose. This legacy feature should be removed after the file to
+ * database migration is completely done.
+ */
+public class ConfigPersistenceBuilder {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConfigPersistenceBuilder.class);
+
+ @VisibleForTesting
+ static class ConfigPersistenceFactory {
+
+ private final Configs configs;
+ private final boolean setupDatabase;
+ private final boolean useConfigDatabase;
+
+ /**
+ * @param setupDatabase initialize the database and load data; this is necessary because this method
+ * has multiple callers, and we want to setup the database only once to prevent race
+ * conditions.
+ */
+ @VisibleForTesting
+ ConfigPersistenceFactory(Configs configs, boolean setupDatabase, boolean useConfigDatabase) {
+ this.configs = configs;
+ this.setupDatabase = setupDatabase;
+ this.useConfigDatabase = useConfigDatabase;
+ }
+
+ /**
+ * Create a config persistence based on the configs.
+ *
+ * If config root is defined, create a database config persistence and copy the configs from the
+ * file-based config persistence. Otherwise, seed the database from the yaml files.
+ */
+ public ConfigPersistence create() throws IOException {
+ if (!useConfigDatabase) {
+ Path configRoot = configs.getConfigRoot();
+ LOGGER.info("Use file system config persistence (root: {})", configRoot);
+ return FileSystemConfigPersistence.createWithValidation(configRoot);
+ }
+
+ if (configs.getConfigRoot() == null) {
+ // This branch will only be true in a future Airbyte version, in which
+ // the config root is no longer required and everything lives in the database.
+ return createDbPersistenceWithYamlSeed();
+ }
+
+ return createDbPersistenceWithFileSeed();
+ }
+
+ @VisibleForTesting
+ ConfigPersistence createDbPersistenceWithYamlSeed() throws IOException {
+ ConfigPersistence seedConfigPersistence = new YamlSeedConfigPersistence();
+ return createDbPersistence(seedConfigPersistence);
+ }
+
+ @VisibleForTesting
+ ConfigPersistence createDbPersistenceWithFileSeed() throws IOException {
+ Path configRoot = configs.getConfigRoot();
+ ConfigPersistence fsConfigPersistence = FileSystemConfigPersistence.createWithValidation(configRoot);
+ return createDbPersistence(fsConfigPersistence);
+ }
+
+ @VisibleForTesting
+ ConfigPersistence createDbPersistence(ConfigPersistence seedConfigPersistence) throws IOException {
+ LOGGER.info("Use database config persistence.");
+
+ // When setupDatabase is true, it means the database will be initialized after we
+ // connect to the database. So the database itself is considered ready as long as
+ // the connection is alive. Otherwise, the database is expected to have full data.
+ Function isReady = setupDatabase
+ ? Databases.IS_CONFIG_DATABASE_CONNECTED
+ : Databases.IS_CONFIG_DATABASE_LOADED_WITH_DATA;
+
+ Database database = Databases.createPostgresDatabaseWithRetry(
+ configs.getConfigDatabaseUser(),
+ configs.getConfigDatabasePassword(),
+ configs.getConfigDatabaseUrl(),
+ isReady);
+
+ DatabaseConfigPersistence dbConfigPersistence = new DatabaseConfigPersistence(database);
+ if (setupDatabase) {
+ dbConfigPersistence.initialize(MoreResources.readResource(AIRBYTE_CONFIGS_TABLE_SCHEMA));
+ dbConfigPersistence.loadData(seedConfigPersistence);
+ }
+
+ return new ValidatingConfigPersistence(dbConfigPersistence);
+ }
+
+ }
+
+ public static Builder builder(Configs configs) {
+ return new Builder(configs);
+ }
+
+ public static class Builder {
+
+ private final Configs configs;
+ private boolean setupDatabase = false;
+ private boolean useConfigDatabase = true;
+
+ private Builder(Configs configs) {
+ this.configs = configs;
+ }
+
+ public Builder setupDatabase(boolean setupDatabase) {
+ this.setupDatabase = setupDatabase;
+ return this;
+ }
+
+ public Builder useConfigDatabase(boolean useConfigDatabase) {
+ this.useConfigDatabase = useConfigDatabase;
+ return this;
+ }
+
+ public ConfigPersistence build() throws IOException {
+ return new ConfigPersistenceFactory(configs, setupDatabase, useConfigDatabase).create();
+ }
+
+ }
+
+}
diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistenceFactory.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistenceFactory.java
deleted file mode 100644
index 7fe985a274cb6..0000000000000
--- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistenceFactory.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 Airbyte
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package io.airbyte.config.persistence;
-
-import static io.airbyte.config.persistence.AirbyteConfigsTable.AIRBYTE_CONFIGS_TABLE_SCHEMA;
-
-import io.airbyte.commons.resources.MoreResources;
-import io.airbyte.config.Configs;
-import io.airbyte.db.Database;
-import io.airbyte.db.Databases;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.function.Function;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * By default, this factory returns a database config persistence. it can still return a file system
- * config persistence for testing purpose. This legacy feature should be removed after the file to
- * database migration is completely done.
- */
-public class ConfigPersistenceFactory {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(ConfigPersistenceFactory.class);
-
- private final Configs configs;
- private final boolean setupDatabase;
- private final boolean useConfigDatabase;
-
- /**
- * @param setupDatabase initialize the database and load data; this is necessary because this method
- * has multiple callers, and we want to setup the database only once to prevent race
- * conditions.
- */
- private ConfigPersistenceFactory(Configs configs, boolean setupDatabase, boolean useConfigDatabase) {
- this.configs = configs;
- this.setupDatabase = setupDatabase;
- this.useConfigDatabase = useConfigDatabase;
- }
-
- public static Builder build(Configs configs) {
- return new Builder(configs);
- }
-
- public static class Builder {
-
- private final Configs configs;
- private boolean setupDatabase = false;
- private boolean useConfigDatabase = true;
-
- private Builder(Configs configs) {
- this.configs = configs;
- }
-
- public Builder setupDatabase(boolean setupDatabase) {
- this.setupDatabase = setupDatabase;
- return this;
- }
-
- public Builder useConfigDatabase(boolean useConfigDatabase) {
- this.useConfigDatabase = useConfigDatabase;
- return this;
- }
-
- public ConfigPersistenceFactory get() {
- return new ConfigPersistenceFactory(configs, setupDatabase, useConfigDatabase);
- }
-
- }
-
- /**
- * Create a config persistence based on the configs.
- *
- * If config root is defined, create a database config persistence and copy the configs from the
- * file-based config persistence. Otherwise, seed the database from the yaml files.
- */
- public ConfigPersistence create() throws IOException {
- if (!useConfigDatabase) {
- Path configRoot = configs.getConfigRoot();
- LOGGER.info("Use file system config persistence (root: {})", configRoot);
- return FileSystemConfigPersistence.createWithValidation(configRoot);
- }
-
- if (configs.getConfigRoot() == null) {
- // This branch will only be true in a future Airbyte version, in which
- // the config root is no longer required and everything lives in the database.
- return createDbPersistenceWithYamlSeed();
- }
-
- return createDbPersistenceWithFileSeed();
- }
-
- ConfigPersistence createDbPersistenceWithYamlSeed() throws IOException {
- ConfigPersistence seedConfigPersistence = new YamlSeedConfigPersistence();
- return createDbPersistence(seedConfigPersistence);
- }
-
- ConfigPersistence createDbPersistenceWithFileSeed() throws IOException {
- Path configRoot = configs.getConfigRoot();
- ConfigPersistence fsConfigPersistence = FileSystemConfigPersistence.createWithValidation(configRoot);
- return createDbPersistence(fsConfigPersistence);
- }
-
- ConfigPersistence createDbPersistence(ConfigPersistence seedConfigPersistence) throws IOException {
- LOGGER.info("Use database config persistence.");
-
- // When setupDatabase is true, it means the database will be initialized after we
- // connect to the database. So the database itself is considered ready as long as
- // the connection is alive. Otherwise, the database is expected to have full data.
- Function isReady = setupDatabase
- ? Databases.IS_CONFIG_DATABASE_CONNECTED
- : Databases.IS_CONFIG_DATABASE_LOADED_WITH_DATA;
-
- Database database = Databases.createPostgresDatabaseWithRetry(
- configs.getConfigDatabaseUser(),
- configs.getConfigDatabasePassword(),
- configs.getConfigDatabaseUrl(),
- isReady);
-
- DatabaseConfigPersistence dbConfigPersistence = new DatabaseConfigPersistence(database);
- if (setupDatabase) {
- dbConfigPersistence.initialize(MoreResources.readResource(AIRBYTE_CONFIGS_TABLE_SCHEMA));
- dbConfigPersistence.loadData(seedConfigPersistence);
- }
-
- return new ValidatingConfigPersistence(dbConfigPersistence);
- }
-
-}
diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistenceFactoryTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistenceBuilderTest.java
similarity index 89%
rename from airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistenceFactoryTest.java
rename to airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistenceBuilderTest.java
index b63326fa647b7..c22952ccd24f9 100644
--- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistenceFactoryTest.java
+++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistenceBuilderTest.java
@@ -44,6 +44,7 @@
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.Configs;
import io.airbyte.config.StandardWorkspace;
+import io.airbyte.config.persistence.ConfigPersistenceBuilder.ConfigPersistenceFactory;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import java.io.IOException;
@@ -64,7 +65,7 @@
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PostgreSQLContainer;
-class ConfigPersistenceFactoryTest extends BaseTest {
+class ConfigPersistenceBuilderTest extends BaseTest {
private static PostgreSQLContainer> container;
private static Configs configs;
@@ -103,7 +104,7 @@ void tearDown() throws Exception {
@Test
public void testCreateDbPersistenceWithYamlSeed() throws IOException {
- ConfigPersistence dbPersistence = ConfigPersistenceFactory.build(configs).setupDatabase(true).get().createDbPersistenceWithYamlSeed();
+ ConfigPersistence dbPersistence = new ConfigPersistenceFactory(configs, true, false).createDbPersistenceWithYamlSeed();
ConfigPersistence seedPersistence = new YamlSeedConfigPersistence();
assertSameConfigDump(seedPersistence.dumpConfigs(), dbPersistence.dumpConfigs());
}
@@ -111,14 +112,14 @@ public void testCreateDbPersistenceWithYamlSeed() throws IOException {
@Test
public void testCreateDbPersistenceWithFileSeed() throws Exception {
Path testRoot = Path.of("/tmp/cpf_test_file_seed");
- Path rootPath = Files.createTempDirectory(Files.createDirectories(testRoot), ConfigPersistenceFactoryTest.class.getName());
+ Path rootPath = Files.createTempDirectory(Files.createDirectories(testRoot), ConfigPersistenceBuilderTest.class.getName());
ConfigPersistence seedPersistence = new FileSystemConfigPersistence(rootPath);
writeSource(seedPersistence, SOURCE_GITHUB);
writeDestination(seedPersistence, DESTINATION_S3);
when(configs.getConfigRoot()).thenReturn(rootPath);
- ConfigPersistence dbPersistence = ConfigPersistenceFactory.build(configs).setupDatabase(true).get().createDbPersistenceWithFileSeed();
+ ConfigPersistence dbPersistence = new ConfigPersistenceFactory(configs, true, false).createDbPersistenceWithFileSeed();
int dbConfigSize = (int) dbPersistence.dumpConfigs().values().stream()
.map(stream -> stream.collect(Collectors.toList()))
.mapToLong(Collection::size)
@@ -147,7 +148,7 @@ public void testCreateDbPersistenceWithoutSetupDatabase() throws Exception {
ConfigPersistence seedPersistence = spy(new YamlSeedConfigPersistence());
// When setupDatabase is false, the createDbPersistence method does not initialize
// the database itself, but it expects that the database has already been initialized.
- ConfigPersistence dbPersistence = ConfigPersistenceFactory.build(configs).setupDatabase(false).get().createDbPersistence(seedPersistence);
+ ConfigPersistence dbPersistence = new ConfigPersistenceFactory(configs, false, false).createDbPersistence(seedPersistence);
// The return persistence is not initialized by the seed persistence, and has only one config.
verify(seedPersistence, never()).dumpConfigs();
assertSameConfigDump(
@@ -158,14 +159,14 @@ public void testCreateDbPersistenceWithoutSetupDatabase() throws Exception {
@Test
public void testCreateFileSystemConfigPersistence() throws Exception {
Path testRoot = Path.of("/tmp/cpf_test_file_system");
- Path rootPath = Files.createTempDirectory(Files.createDirectories(testRoot), ConfigPersistenceFactoryTest.class.getName());
+ Path rootPath = Files.createTempDirectory(Files.createDirectories(testRoot), ConfigPersistenceBuilderTest.class.getName());
ConfigPersistence seedPersistence = new FileSystemConfigPersistence(rootPath);
writeSource(seedPersistence, SOURCE_GITHUB);
writeDestination(seedPersistence, DESTINATION_S3);
when(configs.getConfigRoot()).thenReturn(rootPath);
- ConfigPersistence filePersistence = ConfigPersistenceFactory.build(configs).useConfigDatabase(false).get().create();
+ ConfigPersistence filePersistence = new ConfigPersistenceFactory(configs, true, false).create();
assertSameConfigDump(seedPersistence.dumpConfigs(), filePersistence.dumpConfigs());
}
@@ -187,10 +188,10 @@ public void testMigrateFromFileToDbPersistence() throws Exception {
// first run uses file system config persistence, and adds an extra workspace
Path testRoot = Path.of("/tmp/cpf_test_migration");
- Path rootPath = Files.createTempDirectory(Files.createDirectories(testRoot), ConfigPersistenceFactoryTest.class.getName());
+ Path rootPath = Files.createTempDirectory(Files.createDirectories(testRoot), ConfigPersistenceBuilderTest.class.getName());
when(configs.getConfigRoot()).thenReturn(rootPath);
- ConfigPersistence filePersistence = ConfigPersistenceFactory.build(configs).useConfigDatabase(false).get().create();
+ ConfigPersistence filePersistence = new ConfigPersistenceFactory(configs, true, false).create();
filePersistence.replaceAllConfigs(seedConfigs, false);
filePersistence.writeConfig(ConfigSchema.STANDARD_WORKSPACE, extraWorkspace.getWorkspaceId().toString(), extraWorkspace);
@@ -198,7 +199,7 @@ public void testMigrateFromFileToDbPersistence() throws Exception {
// second run uses database config persistence;
// the only difference is that useConfigDatabase is no longer overridden to false;
// the extra workspace should be ported to this persistence
- ConfigPersistence dbPersistence = ConfigPersistenceFactory.build(configs).setupDatabase(true).get().create();
+ ConfigPersistence dbPersistence = new ConfigPersistenceFactory(configs, true, false).create();
Map> expected = Map.of(
ConfigSchema.STANDARD_WORKSPACE.name(), Stream.of(Jsons.jsonNode(DEFAULT_WORKSPACE), Jsons.jsonNode(extraWorkspace)),
ConfigSchema.STANDARD_SOURCE_DEFINITION.name(), Stream.of(Jsons.jsonNode(SOURCE_GITHUB), Jsons.jsonNode(SOURCE_POSTGRES)),
diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java
index 34e0a1078b651..3902f8d9dc8d2 100644
--- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java
+++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java
@@ -32,7 +32,7 @@
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.persistence.ConfigPersistence;
-import io.airbyte.config.persistence.ConfigPersistenceFactory;
+import io.airbyte.config.persistence.ConfigPersistenceBuilder;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
@@ -209,7 +209,7 @@ public static void main(String[] args) throws IOException, InterruptedException
final ProcessFactory processFactory = getProcessBuilderFactory(configs);
final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase);
- final ConfigPersistence configPersistence = ConfigPersistenceFactory.build(configs).setupDatabase(false).get().create();
+ final ConfigPersistence configPersistence = ConfigPersistenceBuilder.builder(configs).setupDatabase(false).build();
final ConfigRepository configRepository = new ConfigRepository(configPersistence);
final JobCleaner jobCleaner = new JobCleaner(
configs.getWorkspaceRetentionConfig(),
diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
index 1df6a3a7210b1..0af58d5080fa5 100644
--- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
+++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
@@ -35,7 +35,7 @@
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigPersistence;
-import io.airbyte.config.persistence.ConfigPersistenceFactory;
+import io.airbyte.config.persistence.ConfigPersistenceBuilder;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.PersistenceConstants;
import io.airbyte.db.Database;
@@ -209,7 +209,7 @@ public static void runServer(final Set requestFilters,
MDC.put(LogClientSingleton.WORKSPACE_MDC_KEY, LogClientSingleton.getServerLogsRoot(configs).toString());
LOGGER.info("Creating config repository...");
- final ConfigPersistence configPersistence = ConfigPersistenceFactory.build(configs).setupDatabase(true).get().create();
+ final ConfigPersistence configPersistence = ConfigPersistenceBuilder.builder(configs).setupDatabase(true).build();
final ConfigRepository configRepository = new ConfigRepository(configPersistence);
// hack: upon installation we need to assign a random customerId so that when