Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create jobs database tables without init container #4942

Merged
merged 21 commits into from
Jul 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,9 @@

package io.airbyte.config.persistence;

import static io.airbyte.config.persistence.AirbyteConfigsTable.AIRBYTE_CONFIGS_TABLE_SCHEMA;

import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.Configs;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -107,25 +104,19 @@ ConfigPersistence getDbPersistence(ConfigPersistence seedConfigPersistence) thro

DatabaseConfigPersistence dbConfigPersistence;
if (setupDatabase) {
// When we need to setup the database, it means the database will be initialized after
// we connect to the database. So the database itself is considered ready as long as
// the connection is alive.
Database database = Databases.createPostgresDatabaseWithRetry(
Database database = new ConfigsDatabaseInstance(
configs.getConfigDatabaseUser(),
configs.getConfigDatabasePassword(),
configs.getConfigDatabaseUrl(),
Databases.IS_CONFIG_DATABASE_CONNECTED);
configs.getConfigDatabaseUrl())
.getAndInitialize();
dbConfigPersistence = new DatabaseConfigPersistence(database)
.initialize(MoreResources.readResource(AIRBYTE_CONFIGS_TABLE_SCHEMA))
.loadData(seedConfigPersistence);
} else {
// When we don't need to setup the database, it means the database is initialized
// somewhere else, and it is considered ready only when data has been loaded into it.
Database database = Databases.createPostgresDatabaseWithRetry(
Database database = new ConfigsDatabaseInstance(
configs.getConfigDatabaseUser(),
configs.getConfigDatabasePassword(),
configs.getConfigDatabaseUrl(),
Databases.IS_CONFIG_DATABASE_LOADED_WITH_DATA);
configs.getConfigDatabaseUrl())
.getInitialized();
dbConfigPersistence = new DatabaseConfigPersistence(database);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@

package io.airbyte.config.persistence;

import static io.airbyte.config.persistence.AirbyteConfigsTable.AIRBYTE_CONFIGS;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CONFIG_BLOB;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CONFIG_ID;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CONFIG_TYPE;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CREATED_AT;
import static io.airbyte.config.persistence.AirbyteConfigsTable.UPDATED_AT;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.AIRBYTE_CONFIGS;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.CONFIG_BLOB;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.CONFIG_ID;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.CONFIG_TYPE;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.CREATED_AT;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.UPDATED_AT;
import static org.jooq.impl.DSL.asterisk;
import static org.jooq.impl.DSL.select;

Expand Down Expand Up @@ -67,25 +67,6 @@ public DatabaseConfigPersistence(Database database) {
this.database = new ExceptionWrappingDatabase(database);
}

/**
* Initialize the database by creating the {@code airbyte_configs} table.
*/
public DatabaseConfigPersistence initialize(String schema) throws IOException {
database.transaction(ctx -> {
boolean hasConfigsTable = ctx.fetchExists(select()
.from("information_schema.tables")
.where("table_name = 'airbyte_configs'"));
if (hasConfigsTable) {
return null;
}
LOGGER.info("Config database has not been initialized");
LOGGER.info("Creating tables with schema: {}", schema);
ctx.execute(schema);
return null;
});
return this;
}

/**
* Populate the {@code airbyte_configs} table with configs from the seed persistence. Only do so if
* the table is empty. Otherwise, we assume that it has been populated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@

package io.airbyte.config.persistence;

import static io.airbyte.config.persistence.AirbyteConfigsTable.AIRBYTE_CONFIGS;
import static io.airbyte.config.persistence.AirbyteConfigsTable.AIRBYTE_CONFIGS_TABLE_SCHEMA;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CONFIG_BLOB;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CONFIG_ID;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CONFIG_TYPE;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CREATED_AT;
import static io.airbyte.config.persistence.AirbyteConfigsTable.UPDATED_AT;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.AIRBYTE_CONFIGS;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.CONFIG_BLOB;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.CONFIG_ID;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.CONFIG_TYPE;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.CREATED_AT;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.UPDATED_AT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand All @@ -40,12 +39,11 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.Configs;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -92,8 +90,8 @@ public static void dbDown() {

@BeforeEach
public void setup() throws Exception {
database = Databases.createPostgresDatabase(container.getUsername(), container.getPassword(), container.getJdbcUrl());
database.transaction(ctx -> ctx.execute("DROP TABLE IF EXISTS airbyte_configs"));
database = new ConfigsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize();
database.transaction(ctx -> ctx.execute("TRUNCATE TABLE airbyte_configs"));
}

@AfterEach
Expand Down Expand Up @@ -130,10 +128,8 @@ public void testCreateDbPersistenceWithFileSeed() throws Exception {
@Test
public void testCreateDbPersistenceWithoutSetupDatabase() throws Exception {
// Initialize the database with one config.
String schema = MoreResources.readResource(AIRBYTE_CONFIGS_TABLE_SCHEMA);
Timestamp timestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
database.transaction(ctx -> {
ctx.execute(schema);
ctx.insertInto(AIRBYTE_CONFIGS)
.set(CONFIG_ID, SOURCE_GITHUB.getSourceDefinitionId().toString())
.set(CONFIG_TYPE, ConfigSchema.STANDARD_SOURCE_DEFINITION.name())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,26 @@

package io.airbyte.config.persistence;

import static io.airbyte.config.persistence.AirbyteConfigsTable.AIRBYTE_CONFIGS;
import static io.airbyte.config.persistence.AirbyteConfigsTable.AIRBYTE_CONFIGS_TABLE_SCHEMA;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CONFIG_BLOB;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CONFIG_ID;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CONFIG_TYPE;
import static io.airbyte.config.persistence.AirbyteConfigsTable.CREATED_AT;
import static io.airbyte.config.persistence.AirbyteConfigsTable.UPDATED_AT;
import static io.airbyte.db.instance.configs.AirbyteConfigsTable.AIRBYTE_CONFIGS;
import static org.jooq.impl.DSL.asterisk;
import static org.jooq.impl.DSL.count;
import static org.jooq.impl.DSL.select;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import java.sql.Timestamp;
import java.time.Instant;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.jooq.JSONB;
import org.jooq.Record1;
import org.jooq.Result;
import org.jooq.exception.DataAccessException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -87,9 +74,8 @@ public static void dbDown() {

@BeforeEach
public void setup() throws Exception {
database = Databases.createPostgresDatabase(container.getUsername(), container.getPassword(), container.getJdbcUrl());
database = new ConfigsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize();
configPersistence = new DatabaseConfigPersistence(database);
configPersistence.initialize(MoreResources.readResource(AIRBYTE_CONFIGS_TABLE_SCHEMA));
database.query(ctx -> ctx.execute("TRUNCATE TABLE airbyte_configs"));
}

Expand All @@ -98,25 +84,6 @@ void tearDown() throws Exception {
database.close();
}

@Test
public void testInitialize() throws Exception {
// check table
database.query(ctx -> ctx.fetchExists(select().from(AIRBYTE_CONFIGS)));
// check columns (if any of the column does not exist, the query will throw exception)
database.query(ctx -> ctx.fetchExists(select().from(AIRBYTE_CONFIGS).where(CONFIG_ID.eq("ID"))));
database.query(ctx -> ctx.fetchExists(select().from(AIRBYTE_CONFIGS).where(CONFIG_TYPE.eq("TYPE"))));
database.query(ctx -> ctx.fetchExists(select().from(AIRBYTE_CONFIGS).where(CONFIG_BLOB.eq(JSONB.valueOf("{}")))));
Timestamp timestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
database.query(ctx -> ctx.fetchExists(select().from(AIRBYTE_CONFIGS).where(CREATED_AT.eq(timestamp))));
database.query(ctx -> ctx.fetchExists(select().from(AIRBYTE_CONFIGS).where(UPDATED_AT.eq(timestamp))));

// when the airbyte_configs has been created, calling initialize again will not change anything
String testSchema = "CREATE TABLE IF NOT EXISTS airbyte_test_configs(id BIGINT PRIMARY KEY);";
configPersistence.initialize(testSchema);
// the airbyte_test_configs table does not exist
assertThrows(DataAccessException.class, () -> database.query(ctx -> ctx.fetchExists(select().from("airbyte_test_configs"))));
}

@Test
public void testLoadData() throws Exception {
ConfigPersistence seedPersistence = mock(ConfigPersistence.class);
Expand Down
2 changes: 1 addition & 1 deletion airbyte-db/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
FROM postgres:13-alpine

COPY src/main/resources/schema.sql /docker-entrypoint-initdb.d/000_init.sql
COPY src/main/resources/init.sql /docker-entrypoint-initdb.d/000_init.sql
11 changes: 11 additions & 0 deletions airbyte-db/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# How to Create a New Database

Check `io.airbyte.db.instance.configs` for example.

- Create a new package under `io.airbyte.db.instance` with the name of the database.
- Create the database schema enum that defines all tables in the database.
- Write a SQL script that initializes the database.
- The default path for this file is `resource/<db-name>_database/schema.sql`.
- Implement the `DatabaseInstance` interface that initializes the database by executing the initialization script.
- [Optional] For each table, create a constant class that defines the table and the columns in jooq.
- This is necessary only if you plan to use jooq to query the table.
1 change: 1 addition & 0 deletions airbyte-db/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dependencies {
api 'org.postgresql:postgresql:42.2.18'

implementation project(':airbyte-protocol:models')
implementation project(':airbyte-json-validation')

testImplementation project(':airbyte-test-utils')

Expand Down
30 changes: 0 additions & 30 deletions airbyte-db/src/main/java/io/airbyte/db/Databases.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@

package io.airbyte.db;

import static org.jooq.impl.DSL.select;

import io.airbyte.commons.lang.Exceptions;
import io.airbyte.db.bigquery.BigQueryDatabase;
import io.airbyte.db.jdbc.DefaultJdbcDatabase;
Expand All @@ -43,34 +41,6 @@ public class Databases {

private static final Logger LOGGER = LoggerFactory.getLogger(Databases.class);

// The Job Database is initialized by SQL script, which writes a server UUID at the end.
// So this database is ready when the server UUID record is present.
public static final Function<Database, Boolean> IS_JOB_DATABASE_READY = database -> {
try {
Optional<String> uuid = ServerUuid.get(database);
return uuid.isPresent();
} catch (Exception e) {
return false;
}
};
public static final Function<Database, Boolean> IS_CONFIG_DATABASE_CONNECTED = database -> {
try {
LOGGER.info("Testing config database connection...");
return database.query(ctx -> ctx.fetchExists(select().from("information_schema.tables")));
} catch (Exception e) {
LOGGER.info("Unsuccessful connection to config database", e);
return false;
}
};
public static final Function<Database, Boolean> IS_CONFIG_DATABASE_LOADED_WITH_DATA = database -> {
try {
LOGGER.info("Testing if airbyte_configs has been created...");
return database.query(ctx -> ctx.fetchExists(select().from("airbyte_configs")));
} catch (Exception e) {
return false;
}
};

public static Database createPostgresDatabase(String username, String password, String jdbcConnectionString) {
return createDatabase(username, password, jdbcConnectionString, "org.postgresql.Driver", SQLDialect.POSTGRES);
}
Expand Down
Loading