diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistenceBuilder.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistenceBuilder.java new file mode 100644 index 0000000000000..a1d3222c1d781 --- /dev/null +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistenceBuilder.java @@ -0,0 +1,161 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.config.persistence; + +import static io.airbyte.config.persistence.AirbyteConfigsTable.AIRBYTE_CONFIGS_TABLE_SCHEMA; + +import com.google.common.annotations.VisibleForTesting; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.config.Configs; +import io.airbyte.db.Database; +import io.airbyte.db.Databases; +import java.io.IOException; +import java.nio.file.Path; +import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * By default, this factory returns a database config persistence. it can still return a file system + * config persistence for testing purpose. This legacy feature should be removed after the file to + * database migration is completely done. + */ +public class ConfigPersistenceBuilder { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigPersistenceBuilder.class); + + @VisibleForTesting + static class ConfigPersistenceFactory { + + private final Configs configs; + private final boolean setupDatabase; + private final boolean useConfigDatabase; + + /** + * @param setupDatabase initialize the database and load data; this is necessary because this method + * has multiple callers, and we want to setup the database only once to prevent race + * conditions. + */ + @VisibleForTesting + ConfigPersistenceFactory(Configs configs, boolean setupDatabase, boolean useConfigDatabase) { + this.configs = configs; + this.setupDatabase = setupDatabase; + this.useConfigDatabase = useConfigDatabase; + } + + /** + * Create a config persistence based on the configs. + *

+ * If config root is defined, create a database config persistence and copy the configs from the + * file-based config persistence. Otherwise, seed the database from the yaml files. + */ + public ConfigPersistence create() throws IOException { + if (!useConfigDatabase) { + Path configRoot = configs.getConfigRoot(); + LOGGER.info("Use file system config persistence (root: {})", configRoot); + return FileSystemConfigPersistence.createWithValidation(configRoot); + } + + if (configs.getConfigRoot() == null) { + // This branch will only be true in a future Airbyte version, in which + // the config root is no longer required and everything lives in the database. + return createDbPersistenceWithYamlSeed(); + } + + return createDbPersistenceWithFileSeed(); + } + + @VisibleForTesting + ConfigPersistence createDbPersistenceWithYamlSeed() throws IOException { + ConfigPersistence seedConfigPersistence = new YamlSeedConfigPersistence(); + return createDbPersistence(seedConfigPersistence); + } + + @VisibleForTesting + ConfigPersistence createDbPersistenceWithFileSeed() throws IOException { + Path configRoot = configs.getConfigRoot(); + ConfigPersistence fsConfigPersistence = FileSystemConfigPersistence.createWithValidation(configRoot); + return createDbPersistence(fsConfigPersistence); + } + + @VisibleForTesting + ConfigPersistence createDbPersistence(ConfigPersistence seedConfigPersistence) throws IOException { + LOGGER.info("Use database config persistence."); + + // When setupDatabase is true, it means the database will be initialized after we + // connect to the database. So the database itself is considered ready as long as + // the connection is alive. Otherwise, the database is expected to have full data. + Function isReady = setupDatabase + ? Databases.IS_CONFIG_DATABASE_CONNECTED + : Databases.IS_CONFIG_DATABASE_LOADED_WITH_DATA; + + Database database = Databases.createPostgresDatabaseWithRetry( + configs.getConfigDatabaseUser(), + configs.getConfigDatabasePassword(), + configs.getConfigDatabaseUrl(), + isReady); + + DatabaseConfigPersistence dbConfigPersistence = new DatabaseConfigPersistence(database); + if (setupDatabase) { + dbConfigPersistence.initialize(MoreResources.readResource(AIRBYTE_CONFIGS_TABLE_SCHEMA)); + dbConfigPersistence.loadData(seedConfigPersistence); + } + + return new ValidatingConfigPersistence(dbConfigPersistence); + } + + } + + public static Builder builder(Configs configs) { + return new Builder(configs); + } + + public static class Builder { + + private final Configs configs; + private boolean setupDatabase = false; + private boolean useConfigDatabase = true; + + private Builder(Configs configs) { + this.configs = configs; + } + + public Builder setupDatabase(boolean setupDatabase) { + this.setupDatabase = setupDatabase; + return this; + } + + public Builder useConfigDatabase(boolean useConfigDatabase) { + this.useConfigDatabase = useConfigDatabase; + return this; + } + + public ConfigPersistence build() throws IOException { + return new ConfigPersistenceFactory(configs, setupDatabase, useConfigDatabase).create(); + } + + } + +} diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistenceFactory.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistenceFactory.java deleted file mode 100644 index 7fe985a274cb6..0000000000000 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistenceFactory.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.config.persistence; - -import static io.airbyte.config.persistence.AirbyteConfigsTable.AIRBYTE_CONFIGS_TABLE_SCHEMA; - -import io.airbyte.commons.resources.MoreResources; -import io.airbyte.config.Configs; -import io.airbyte.db.Database; -import io.airbyte.db.Databases; -import java.io.IOException; -import java.nio.file.Path; -import java.util.function.Function; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * By default, this factory returns a database config persistence. it can still return a file system - * config persistence for testing purpose. This legacy feature should be removed after the file to - * database migration is completely done. - */ -public class ConfigPersistenceFactory { - - private static final Logger LOGGER = LoggerFactory.getLogger(ConfigPersistenceFactory.class); - - private final Configs configs; - private final boolean setupDatabase; - private final boolean useConfigDatabase; - - /** - * @param setupDatabase initialize the database and load data; this is necessary because this method - * has multiple callers, and we want to setup the database only once to prevent race - * conditions. - */ - private ConfigPersistenceFactory(Configs configs, boolean setupDatabase, boolean useConfigDatabase) { - this.configs = configs; - this.setupDatabase = setupDatabase; - this.useConfigDatabase = useConfigDatabase; - } - - public static Builder build(Configs configs) { - return new Builder(configs); - } - - public static class Builder { - - private final Configs configs; - private boolean setupDatabase = false; - private boolean useConfigDatabase = true; - - private Builder(Configs configs) { - this.configs = configs; - } - - public Builder setupDatabase(boolean setupDatabase) { - this.setupDatabase = setupDatabase; - return this; - } - - public Builder useConfigDatabase(boolean useConfigDatabase) { - this.useConfigDatabase = useConfigDatabase; - return this; - } - - public ConfigPersistenceFactory get() { - return new ConfigPersistenceFactory(configs, setupDatabase, useConfigDatabase); - } - - } - - /** - * Create a config persistence based on the configs. - *

- * If config root is defined, create a database config persistence and copy the configs from the - * file-based config persistence. Otherwise, seed the database from the yaml files. - */ - public ConfigPersistence create() throws IOException { - if (!useConfigDatabase) { - Path configRoot = configs.getConfigRoot(); - LOGGER.info("Use file system config persistence (root: {})", configRoot); - return FileSystemConfigPersistence.createWithValidation(configRoot); - } - - if (configs.getConfigRoot() == null) { - // This branch will only be true in a future Airbyte version, in which - // the config root is no longer required and everything lives in the database. - return createDbPersistenceWithYamlSeed(); - } - - return createDbPersistenceWithFileSeed(); - } - - ConfigPersistence createDbPersistenceWithYamlSeed() throws IOException { - ConfigPersistence seedConfigPersistence = new YamlSeedConfigPersistence(); - return createDbPersistence(seedConfigPersistence); - } - - ConfigPersistence createDbPersistenceWithFileSeed() throws IOException { - Path configRoot = configs.getConfigRoot(); - ConfigPersistence fsConfigPersistence = FileSystemConfigPersistence.createWithValidation(configRoot); - return createDbPersistence(fsConfigPersistence); - } - - ConfigPersistence createDbPersistence(ConfigPersistence seedConfigPersistence) throws IOException { - LOGGER.info("Use database config persistence."); - - // When setupDatabase is true, it means the database will be initialized after we - // connect to the database. So the database itself is considered ready as long as - // the connection is alive. Otherwise, the database is expected to have full data. - Function isReady = setupDatabase - ? Databases.IS_CONFIG_DATABASE_CONNECTED - : Databases.IS_CONFIG_DATABASE_LOADED_WITH_DATA; - - Database database = Databases.createPostgresDatabaseWithRetry( - configs.getConfigDatabaseUser(), - configs.getConfigDatabasePassword(), - configs.getConfigDatabaseUrl(), - isReady); - - DatabaseConfigPersistence dbConfigPersistence = new DatabaseConfigPersistence(database); - if (setupDatabase) { - dbConfigPersistence.initialize(MoreResources.readResource(AIRBYTE_CONFIGS_TABLE_SCHEMA)); - dbConfigPersistence.loadData(seedConfigPersistence); - } - - return new ValidatingConfigPersistence(dbConfigPersistence); - } - -} diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistenceFactoryTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistenceBuilderTest.java similarity index 89% rename from airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistenceFactoryTest.java rename to airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistenceBuilderTest.java index b63326fa647b7..c22952ccd24f9 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistenceFactoryTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistenceBuilderTest.java @@ -44,6 +44,7 @@ import io.airbyte.config.ConfigSchema; import io.airbyte.config.Configs; import io.airbyte.config.StandardWorkspace; +import io.airbyte.config.persistence.ConfigPersistenceBuilder.ConfigPersistenceFactory; import io.airbyte.db.Database; import io.airbyte.db.Databases; import java.io.IOException; @@ -64,7 +65,7 @@ import org.junit.jupiter.api.Test; import org.testcontainers.containers.PostgreSQLContainer; -class ConfigPersistenceFactoryTest extends BaseTest { +class ConfigPersistenceBuilderTest extends BaseTest { private static PostgreSQLContainer container; private static Configs configs; @@ -103,7 +104,7 @@ void tearDown() throws Exception { @Test public void testCreateDbPersistenceWithYamlSeed() throws IOException { - ConfigPersistence dbPersistence = ConfigPersistenceFactory.build(configs).setupDatabase(true).get().createDbPersistenceWithYamlSeed(); + ConfigPersistence dbPersistence = new ConfigPersistenceFactory(configs, true, false).createDbPersistenceWithYamlSeed(); ConfigPersistence seedPersistence = new YamlSeedConfigPersistence(); assertSameConfigDump(seedPersistence.dumpConfigs(), dbPersistence.dumpConfigs()); } @@ -111,14 +112,14 @@ public void testCreateDbPersistenceWithYamlSeed() throws IOException { @Test public void testCreateDbPersistenceWithFileSeed() throws Exception { Path testRoot = Path.of("/tmp/cpf_test_file_seed"); - Path rootPath = Files.createTempDirectory(Files.createDirectories(testRoot), ConfigPersistenceFactoryTest.class.getName()); + Path rootPath = Files.createTempDirectory(Files.createDirectories(testRoot), ConfigPersistenceBuilderTest.class.getName()); ConfigPersistence seedPersistence = new FileSystemConfigPersistence(rootPath); writeSource(seedPersistence, SOURCE_GITHUB); writeDestination(seedPersistence, DESTINATION_S3); when(configs.getConfigRoot()).thenReturn(rootPath); - ConfigPersistence dbPersistence = ConfigPersistenceFactory.build(configs).setupDatabase(true).get().createDbPersistenceWithFileSeed(); + ConfigPersistence dbPersistence = new ConfigPersistenceFactory(configs, true, false).createDbPersistenceWithFileSeed(); int dbConfigSize = (int) dbPersistence.dumpConfigs().values().stream() .map(stream -> stream.collect(Collectors.toList())) .mapToLong(Collection::size) @@ -147,7 +148,7 @@ public void testCreateDbPersistenceWithoutSetupDatabase() throws Exception { ConfigPersistence seedPersistence = spy(new YamlSeedConfigPersistence()); // When setupDatabase is false, the createDbPersistence method does not initialize // the database itself, but it expects that the database has already been initialized. - ConfigPersistence dbPersistence = ConfigPersistenceFactory.build(configs).setupDatabase(false).get().createDbPersistence(seedPersistence); + ConfigPersistence dbPersistence = new ConfigPersistenceFactory(configs, false, false).createDbPersistence(seedPersistence); // The return persistence is not initialized by the seed persistence, and has only one config. verify(seedPersistence, never()).dumpConfigs(); assertSameConfigDump( @@ -158,14 +159,14 @@ public void testCreateDbPersistenceWithoutSetupDatabase() throws Exception { @Test public void testCreateFileSystemConfigPersistence() throws Exception { Path testRoot = Path.of("/tmp/cpf_test_file_system"); - Path rootPath = Files.createTempDirectory(Files.createDirectories(testRoot), ConfigPersistenceFactoryTest.class.getName()); + Path rootPath = Files.createTempDirectory(Files.createDirectories(testRoot), ConfigPersistenceBuilderTest.class.getName()); ConfigPersistence seedPersistence = new FileSystemConfigPersistence(rootPath); writeSource(seedPersistence, SOURCE_GITHUB); writeDestination(seedPersistence, DESTINATION_S3); when(configs.getConfigRoot()).thenReturn(rootPath); - ConfigPersistence filePersistence = ConfigPersistenceFactory.build(configs).useConfigDatabase(false).get().create(); + ConfigPersistence filePersistence = new ConfigPersistenceFactory(configs, true, false).create(); assertSameConfigDump(seedPersistence.dumpConfigs(), filePersistence.dumpConfigs()); } @@ -187,10 +188,10 @@ public void testMigrateFromFileToDbPersistence() throws Exception { // first run uses file system config persistence, and adds an extra workspace Path testRoot = Path.of("/tmp/cpf_test_migration"); - Path rootPath = Files.createTempDirectory(Files.createDirectories(testRoot), ConfigPersistenceFactoryTest.class.getName()); + Path rootPath = Files.createTempDirectory(Files.createDirectories(testRoot), ConfigPersistenceBuilderTest.class.getName()); when(configs.getConfigRoot()).thenReturn(rootPath); - ConfigPersistence filePersistence = ConfigPersistenceFactory.build(configs).useConfigDatabase(false).get().create(); + ConfigPersistence filePersistence = new ConfigPersistenceFactory(configs, true, false).create(); filePersistence.replaceAllConfigs(seedConfigs, false); filePersistence.writeConfig(ConfigSchema.STANDARD_WORKSPACE, extraWorkspace.getWorkspaceId().toString(), extraWorkspace); @@ -198,7 +199,7 @@ public void testMigrateFromFileToDbPersistence() throws Exception { // second run uses database config persistence; // the only difference is that useConfigDatabase is no longer overridden to false; // the extra workspace should be ported to this persistence - ConfigPersistence dbPersistence = ConfigPersistenceFactory.build(configs).setupDatabase(true).get().create(); + ConfigPersistence dbPersistence = new ConfigPersistenceFactory(configs, true, false).create(); Map> expected = Map.of( ConfigSchema.STANDARD_WORKSPACE.name(), Stream.of(Jsons.jsonNode(DEFAULT_WORKSPACE), Jsons.jsonNode(extraWorkspace)), ConfigSchema.STANDARD_SOURCE_DEFINITION.name(), Stream.of(Jsons.jsonNode(SOURCE_GITHUB), Jsons.jsonNode(SOURCE_POSTGRES)), diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java index 34e0a1078b651..3902f8d9dc8d2 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java @@ -32,7 +32,7 @@ import io.airbyte.config.EnvConfigs; import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.config.persistence.ConfigPersistence; -import io.airbyte.config.persistence.ConfigPersistenceFactory; +import io.airbyte.config.persistence.ConfigPersistenceBuilder; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.db.Database; import io.airbyte.db.Databases; @@ -209,7 +209,7 @@ public static void main(String[] args) throws IOException, InterruptedException final ProcessFactory processFactory = getProcessBuilderFactory(configs); final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase); - final ConfigPersistence configPersistence = ConfigPersistenceFactory.build(configs).setupDatabase(false).get().create(); + final ConfigPersistence configPersistence = ConfigPersistenceBuilder.builder(configs).setupDatabase(false).build(); final ConfigRepository configRepository = new ConfigRepository(configPersistence); final JobCleaner jobCleaner = new JobCleaner( configs.getWorkspaceRetentionConfig(), diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 1df6a3a7210b1..0af58d5080fa5 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -35,7 +35,7 @@ import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigPersistence; -import io.airbyte.config.persistence.ConfigPersistenceFactory; +import io.airbyte.config.persistence.ConfigPersistenceBuilder; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.PersistenceConstants; import io.airbyte.db.Database; @@ -209,7 +209,7 @@ public static void runServer(final Set requestFilters, MDC.put(LogClientSingleton.WORKSPACE_MDC_KEY, LogClientSingleton.getServerLogsRoot(configs).toString()); LOGGER.info("Creating config repository..."); - final ConfigPersistence configPersistence = ConfigPersistenceFactory.build(configs).setupDatabase(true).get().create(); + final ConfigPersistence configPersistence = ConfigPersistenceBuilder.builder(configs).setupDatabase(true).build(); final ConfigRepository configRepository = new ConfigRepository(configPersistence); // hack: upon installation we need to assign a random customerId so that when