From a600f6ae47389b04b23640c1f2fea6a92a4b10cf Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Tue, 14 Jun 2022 14:27:38 -0700 Subject: [PATCH] Migrate StateDB to support per stream states (#13731) * Update StateDB to support per Stream states. * Add `StateType` type * Add `steam_name`, `namespace` and `type` to `state` table. * Set the default StateType to LEGACY --- .../airbyte/bootloader/BootloaderAppTest.java | 2 +- ...001__AddStreamDescriptorsToStateTable.java | 99 ++++++++ .../configs_database/schema_dump.txt | 8 + ..._AddStreamDescriptorsToStateTableTest.java | 226 ++++++++++++++++++ 4 files changed, 334 insertions(+), 1 deletion(-) create mode 100644 airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_39_17_001__AddStreamDescriptorsToStateTable.java create mode 100644 airbyte-db/db-lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_39_17_001__AddStreamDescriptorsToStateTableTest.java diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java index b1b9dc0af361f..38366c889537b 100644 --- a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java @@ -129,7 +129,7 @@ void testBootloaderAppBlankDb() throws Exception { val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway); // this line should change with every new migration // to show that you meant to make a new migration to the prod database - assertEquals("0.39.1.001", configsMigrator.getLatestMigration().getVersion().getVersion()); + assertEquals("0.39.17.001", configsMigrator.getLatestMigration().getVersion().getVersion()); val jobsPersistence = new DefaultJobPersistence(jobDatabase); assertEquals(version, jobsPersistence.getVersion().get()); diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_39_17_001__AddStreamDescriptorsToStateTable.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_39_17_001__AddStreamDescriptorsToStateTable.java new file mode 100644 index 0000000000000..5505378858c0a --- /dev/null +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_39_17_001__AddStreamDescriptorsToStateTable.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance.configs.migrations; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Arrays; +import org.flywaydb.core.api.migration.BaseJavaMigration; +import org.flywaydb.core.api.migration.Context; +import org.jooq.Catalog; +import org.jooq.DSLContext; +import org.jooq.EnumType; +import org.jooq.Schema; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; +import org.jooq.impl.SchemaImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class V0_39_17_001__AddStreamDescriptorsToStateTable extends BaseJavaMigration { + + private static final Logger LOGGER = LoggerFactory.getLogger(V0_39_17_001__AddStreamDescriptorsToStateTable.class); + + @Override + public void migrate(final Context context) throws Exception { + LOGGER.info("Running migration: {}", this.getClass().getSimpleName()); + + // Warning: please do not use any jOOQ generated code to write a migration. + // As database schema changes, the generated jOOQ code can be deprecated. So + // old migration may not compile if there is any generated code. + final DSLContext ctx = DSL.using(context.getConnection()); + + migrate(ctx); + } + + @VisibleForTesting + public static void migrate(final DSLContext ctx) { + createStateTypeEnum(ctx); + addStreamDescriptorFieldsToStateTable(ctx); + } + + private static void createStateTypeEnum(final DSLContext ctx) { + ctx.createType(StateType.NAME) + .asEnum(Arrays.stream(StateType.values()).map(StateType::getLiteral).toList()) + .execute(); + } + + private static void addStreamDescriptorFieldsToStateTable(final DSLContext ctx) { + final String STATE_TABLE = "state"; + + ctx.alterTable(STATE_TABLE) + .add(Arrays.asList( + DSL.field("stream_name", SQLDataType.CLOB.nullable(true)), + DSL.field("namespace", SQLDataType.CLOB.nullable(true)), + // type defaults to LEGACY to first set the expected type of all existing states + DSL.field("type", SQLDataType.VARCHAR.asEnumDataType(StateType.class).nullable(false).defaultValue(StateType.LEGACY)), + DSL.constraint("state__connection_id__stream_name__namespace__uq") + .unique(DSL.field("connection_id"), DSL.field("stream_name"), DSL.field("namespace")))) + .execute(); + } + + public enum StateType implements EnumType { + + GLOBAL("GLOBAL"), + STREAM("STREAM"), + LEGACY("LEGACY"); + + public static final String NAME = "state_type"; + + StateType(String literal) { + this.literal = literal; + } + + @Override + public String getLiteral() { + return literal; + } + + @Override + public Catalog getCatalog() { + return getSchema().getCatalog(); + } + + @Override + public Schema getSchema() { + return new SchemaImpl(DSL.name("public")); + } + + @Override + public String getName() { + return NAME; + } + + private final String literal; + + } + +} diff --git a/airbyte-db/db-lib/src/main/resources/configs_database/schema_dump.txt b/airbyte-db/db-lib/src/main/resources/configs_database/schema_dump.txt index fc109b9dd411e..97a509d159661 100644 --- a/airbyte-db/db-lib/src/main/resources/configs_database/schema_dump.txt +++ b/airbyte-db/db-lib/src/main/resources/configs_database/schema_dump.txt @@ -138,6 +138,9 @@ create table "public"."state"( "state" jsonb null, "created_at" timestamptz(35) not null default null, "updated_at" timestamptz(35) not null default null, + "stream_name" text null, + "namespace" text null, + "type" state_type not null default null, constraint "state_pkey" primary key ( "id", @@ -276,6 +279,11 @@ create unique index "connection_operation_pkey" on "public"."connection_operatio "operation_id" asc ); create unique index "operation_pkey" on "public"."operation"("id" asc); +create unique index "state__connection_id__stream_name__namespace__uq" on "public"."state"( + "connection_id" asc, + "stream_name" asc, + "namespace" asc +); create unique index "state_pkey" on "public"."state"( "id" asc, "connection_id" asc diff --git a/airbyte-db/db-lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_39_17_001__AddStreamDescriptorsToStateTableTest.java b/airbyte-db/db-lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_39_17_001__AddStreamDescriptorsToStateTableTest.java new file mode 100644 index 0000000000000..901fedacda7bc --- /dev/null +++ b/airbyte-db/db-lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_39_17_001__AddStreamDescriptorsToStateTableTest.java @@ -0,0 +1,226 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance.configs.migrations; + +import io.airbyte.db.factory.FlywayFactory; +import io.airbyte.db.instance.configs.AbstractConfigsDatabaseTest; +import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator; +import io.airbyte.db.instance.configs.migrations.V0_32_8_001__AirbyteConfigDatabaseDenormalization.ActorType; +import io.airbyte.db.instance.configs.migrations.V0_32_8_001__AirbyteConfigDatabaseDenormalization.NamespaceDefinitionType; +import io.airbyte.db.instance.configs.migrations.V0_39_17_001__AddStreamDescriptorsToStateTable.StateType; +import io.airbyte.db.instance.development.DevDatabaseMigrator; +import java.util.UUID; +import org.flywaydb.core.Flyway; +import org.jooq.DSLContext; +import org.jooq.JSONB; +import org.jooq.exception.DataAccessException; +import org.jooq.impl.DSL; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class V0_39_17_001__AddStreamDescriptorsToStateTableTest extends AbstractConfigsDatabaseTest { + + private final String STATE_TABLE = "State"; + + private UUID connection1; + private UUID connection2; + + @Test + public void testSimpleMigration() { + final DSLContext context = getDslContext(); + + // Adding a couple of states + context.insertInto(DSL.table(STATE_TABLE)) + .columns( + DSL.field("id"), + DSL.field("connection_id")) + .values(UUID.randomUUID(), connection1) + .values(UUID.randomUUID(), connection2) + .execute(); + + // Preconditions check: we should have one row in state + Assertions.assertEquals(2, context.select().from(STATE_TABLE).execute()); + + // Applying the migration + devConfigsDbMigrator.migrate(); + + final UUID newState = UUID.randomUUID(); + context.insertInto(DSL.table(STATE_TABLE)) + .columns( + DSL.field("id"), + DSL.field("connection_id"), + DSL.field("stream_name")) + .values(newState, connection1, "new_stream") + .execute(); + + System.out.println(context.selectFrom("connection").fetch()); + System.out.println(context.selectFrom(STATE_TABLE).fetch()); + + // Our two initial rows and the new row should be LEGACY + Assertions.assertEquals(3, + context.select() + .from(STATE_TABLE) + .where(DSL.field("type").equal(StateType.LEGACY)) + .execute()); + + // There should be no STREAM or GLOBAL + Assertions.assertEquals(0, + context.select() + .from(STATE_TABLE) + .where(DSL.field("type").in(StateType.GLOBAL, StateType.STREAM)) + .execute()); + } + + @Test + public void testUniquenessConstraint() { + devConfigsDbMigrator.migrate(); + + final DSLContext context = getDslContext(); + context.insertInto(DSL.table(STATE_TABLE)) + .columns( + DSL.field("id"), + DSL.field("connection_id"), + DSL.field("type"), + DSL.field("stream_name"), + DSL.field("namespace")) + .values(UUID.randomUUID(), connection1, StateType.GLOBAL, "stream1", "ns2") + .execute(); + + context.insertInto(DSL.table(STATE_TABLE)) + .columns( + DSL.field("id"), + DSL.field("connection_id"), + DSL.field("type"), + DSL.field("stream_name"), + DSL.field("namespace")) + .values(UUID.randomUUID(), connection1, StateType.GLOBAL, "stream1", "ns1") + .execute(); + + context.insertInto(DSL.table(STATE_TABLE)) + .columns( + DSL.field("id"), + DSL.field("connection_id"), + DSL.field("type"), + DSL.field("stream_name"), + DSL.field("namespace")) + .values(UUID.randomUUID(), connection1, StateType.GLOBAL, "stream2", "ns2") + .execute(); + + Assertions.assertThrows(DataAccessException.class, () -> { + context.insertInto(DSL.table(STATE_TABLE)) + .columns( + DSL.field("id"), + DSL.field("connection_id"), + DSL.field("type"), + DSL.field("stream_name"), + DSL.field("namespace")) + .values(UUID.randomUUID(), connection1, StateType.GLOBAL, "stream1", "ns2") + .execute(); + }); + } + + @BeforeEach + public void beforeEach() { + Flyway flyway = FlywayFactory.create(dataSource, "V0_39_17_001__AddStreamDescriptorsToStateTableTest", ConfigsDatabaseMigrator.DB_IDENTIFIER, + ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION); + ConfigsDatabaseMigrator configsDbMigrator = new ConfigsDatabaseMigrator(database, flyway); + devConfigsDbMigrator = new DevDatabaseMigrator(configsDbMigrator); + + devConfigsDbMigrator.createBaseline(); + injectMockData(); + } + + @AfterEach + public void afterEach() { + // Making sure we reset between tests + dslContext.dropSchemaIfExists("public").cascade().execute(); + dslContext.createSchema("public").execute(); + dslContext.setSchema("public").execute(); + } + + private void injectMockData() { + final DSLContext context = getDslContext(); + + UUID workspaceId = UUID.randomUUID(); + UUID actorId = UUID.randomUUID(); + UUID actorDefinitionId = UUID.randomUUID(); + connection1 = UUID.randomUUID(); + connection2 = UUID.randomUUID(); + + context.insertInto(DSL.table("workspace")) + .columns( + DSL.field("id"), + DSL.field("name"), + DSL.field("slug"), + DSL.field("initial_setup_complete")) + .values( + workspaceId, + "base workspace", + "base_workspace", + true) + .execute(); + context.insertInto(DSL.table("actor_definition")) + .columns( + DSL.field("id"), + DSL.field("name"), + DSL.field("docker_repository"), + DSL.field("docker_image_tag"), + DSL.field("actor_type"), + DSL.field("spec")) + .values( + actorDefinitionId, + "Jenkins", + "farosai/airbyte-jenkins-source", + "0.1.23", + ActorType.source, + JSONB.valueOf("{}")) + .execute(); + context.insertInto(DSL.table("actor")) + .columns( + DSL.field("id"), + DSL.field("workspace_id"), + DSL.field("actor_definition_id"), + DSL.field("name"), + DSL.field("configuration"), + DSL.field("actor_type")) + .values( + actorId, + workspaceId, + actorDefinitionId, + "ActorName", + JSONB.valueOf("{}"), + ActorType.source) + .execute(); + + insertConnection(context, connection1, actorId); + insertConnection(context, connection2, actorId); + } + + private void insertConnection(final DSLContext context, final UUID connectionId, final UUID actorId) { + context.insertInto(DSL.table("connection")) + .columns( + DSL.field("id"), + DSL.field("namespace_definition"), + DSL.field("source_id"), + DSL.field("destination_id"), + DSL.field("name"), + DSL.field("catalog"), + DSL.field("manual")) + .values( + connectionId, + NamespaceDefinitionType.source, + actorId, + actorId, + "Connection" + connectionId.toString(), + JSONB.valueOf("{}"), + true) + .execute(); + } + + private DevDatabaseMigrator devConfigsDbMigrator; + +}