From f40c90641bf3d8b554555848eef4d4e8fd6b6b02 Mon Sep 17 00:00:00 2001 From: AndyChen Date: Fri, 14 Apr 2023 00:27:05 +0800 Subject: [PATCH] support rolling updates #710 --- core/src/main/resources/reference.conf | 15 +++++++- .../h2/h2-create-schema-tags-legacy.sql | 9 +++++ .../resources/schema/h2/h2-create-schema.sql | 2 +- .../mysql/mysql-create-schema-tags-legacy.sql | 8 ++++ .../schema/mysql/mysql-create-schema.sql | 4 +- .../oracle-create-schema-tags-legacy.sql | 7 ++++ .../schema/oracle/oracle-create-schema.sql | 2 +- .../postgres-create-schema-tags-legacy.sql | 9 +++++ .../postgres/postgres-create-schema.sql | 4 +- .../sqlserver-create-schema-tags-legacy.sql | 9 +++++ .../sqlserver/sqlserver-create-schema.sql | 4 +- .../jdbc/config/AkkaPersistenceConfig.scala | 11 ++++++ .../jdbc/journal/dao/JournalQueries.scala | 38 ++++++++++++++----- .../jdbc/journal/dao/JournalTables.scala | 17 ++++++++- .../jdbc/query/dao/ReadJournalQueries.scala | 8 +++- .../testkit/internal/SchemaUtilsImpl.scala | 4 ++ 16 files changed, 130 insertions(+), 21 deletions(-) create mode 100644 core/src/main/resources/schema/h2/h2-create-schema-tags-legacy.sql create mode 100644 core/src/main/resources/schema/mysql/mysql-create-schema-tags-legacy.sql create mode 100644 core/src/main/resources/schema/oracle/oracle-create-schema-tags-legacy.sql create mode 100644 core/src/main/resources/schema/postgres/postgres-create-schema-tags-legacy.sql create mode 100644 core/src/main/resources/schema/sqlserver/sqlserver-create-schema-tags-legacy.sql diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index e232db49a..2d09bba93 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -134,7 +134,7 @@ jdbc-journal { } event_tag { - tableName = "event_tag" + tableName = "event_tags" schemaName = "" columnNames { @@ -144,6 +144,19 @@ jdbc-journal { } } + legacy_event_tag { + tableName = "event_tag" + schemaName = "" + + columnNames { + eventId = "event_id" + tag = "tag" + } + } + # for rolling updates the event_tag table migration. + write-legacy-tag = false + read-legacy-tag = false + # Otherwise it would be a pinned dispatcher, see https://github.com/akka/akka/issues/31058 plugin-dispatcher = "akka.actor.default-dispatcher" } diff --git a/core/src/main/resources/schema/h2/h2-create-schema-tags-legacy.sql b/core/src/main/resources/schema/h2/h2-create-schema-tags-legacy.sql new file mode 100644 index 000000000..41e5ae278 --- /dev/null +++ b/core/src/main/resources/schema/h2/h2-create-schema-tags-legacy.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS "event_tag" ( + "event_id" BIGINT NOT NULL, + "tag" VARCHAR NOT NULL, + PRIMARY KEY("event_id", "tag"), + CONSTRAINT fk_event_journal + FOREIGN KEY("event_id") + REFERENCES "event_journal"("ordering") + ON DELETE CASCADE +); \ No newline at end of file diff --git a/core/src/main/resources/schema/h2/h2-create-schema.sql b/core/src/main/resources/schema/h2/h2-create-schema.sql index 339d9031a..80f0e73d7 100644 --- a/core/src/main/resources/schema/h2/h2-create-schema.sql +++ b/core/src/main/resources/schema/h2/h2-create-schema.sql @@ -17,7 +17,7 @@ CREATE TABLE IF NOT EXISTS "event_journal" ( CREATE UNIQUE INDEX "event_journal_ordering_idx" on "event_journal" ("ordering"); -CREATE TABLE IF NOT EXISTS "event_tag" ( +CREATE TABLE IF NOT EXISTS "event_tags" ( "persistence_id" VARCHAR(255) NOT NULL, "sequence_number" BIGINT NOT NULL, "tag" VARCHAR NOT NULL, diff --git a/core/src/main/resources/schema/mysql/mysql-create-schema-tags-legacy.sql b/core/src/main/resources/schema/mysql/mysql-create-schema-tags-legacy.sql new file mode 100644 index 000000000..390f0dcb5 --- /dev/null +++ b/core/src/main/resources/schema/mysql/mysql-create-schema-tags-legacy.sql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS event_tag ( + event_id BIGINT UNSIGNED NOT NULL, + tag VARCHAR(255) NOT NULL, + PRIMARY KEY(event_id, tag), + FOREIGN KEY (event_id) + REFERENCES event_journal(ordering) + ON DELETE CASCADE + ) \ No newline at end of file diff --git a/core/src/main/resources/schema/mysql/mysql-create-schema.sql b/core/src/main/resources/schema/mysql/mysql-create-schema.sql index 4911bd34a..427727ff6 100644 --- a/core/src/main/resources/schema/mysql/mysql-create-schema.sql +++ b/core/src/main/resources/schema/mysql/mysql-create-schema.sql @@ -1,4 +1,4 @@ -CREATE TABLE IF NOT EXISTS event_journal( +CREATE TABLE IF NOT EXISTS event_journal ( ordering SERIAL, deleted BOOLEAN DEFAULT false NOT NULL, persistence_id VARCHAR(255) NOT NULL, @@ -16,7 +16,7 @@ CREATE TABLE IF NOT EXISTS event_journal( CREATE UNIQUE INDEX event_journal_ordering_idx ON event_journal(ordering); -CREATE TABLE IF NOT EXISTS event_tag ( +CREATE TABLE IF NOT EXISTS event_tags ( persistence_id VARCHAR(255) NOT NULL, sequence_number BIGINT NOT NULL, tag VARCHAR(255) NOT NULL, diff --git a/core/src/main/resources/schema/oracle/oracle-create-schema-tags-legacy.sql b/core/src/main/resources/schema/oracle/oracle-create-schema-tags-legacy.sql new file mode 100644 index 000000000..a24404e9c --- /dev/null +++ b/core/src/main/resources/schema/oracle/oracle-create-schema-tags-legacy.sql @@ -0,0 +1,7 @@ +CREATE TABLE EVENT_TAG ( + EVENT_ID NUMERIC NOT NULL, + TAG VARCHAR(255) NOT NULL, + PRIMARY KEY(EVENT_ID, TAG), + FOREIGN KEY(EVENT_ID) REFERENCES EVENT_JOURNAL(ORDERING) + ON DELETE CASCADE +) \ No newline at end of file diff --git a/core/src/main/resources/schema/oracle/oracle-create-schema.sql b/core/src/main/resources/schema/oracle/oracle-create-schema.sql index ff25c6dad..df98c2b49 100644 --- a/core/src/main/resources/schema/oracle/oracle-create-schema.sql +++ b/core/src/main/resources/schema/oracle/oracle-create-schema.sql @@ -22,7 +22,7 @@ CREATE TABLE EVENT_JOURNAL ( CREATE OR REPLACE TRIGGER EVENT_JOURNAL__ORDERING_TRG before insert on EVENT_JOURNAL REFERENCING NEW AS NEW FOR EACH ROW WHEN (new.ORDERING is null) begin select EVENT_JOURNAL__ORDERING_seq.nextval into :new.ORDERING from sys.dual; end; / -CREATE TABLE EVENT_TAG ( +CREATE TABLE EVENT_TAGS ( PERSISTENCE_ID VARCHAR(255) NOT NULL, SEQUENCE_NUMBER NUMERIC NOT NULL, TAG VARCHAR(255) NOT NULL, diff --git a/core/src/main/resources/schema/postgres/postgres-create-schema-tags-legacy.sql b/core/src/main/resources/schema/postgres/postgres-create-schema-tags-legacy.sql new file mode 100644 index 000000000..b56d14d85 --- /dev/null +++ b/core/src/main/resources/schema/postgres/postgres-create-schema-tags-legacy.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS public.event_tag( + event_id BIGINT, + tag VARCHAR(256), + PRIMARY KEY(event_id, tag), + CONSTRAINT fk_event_journal + FOREIGN KEY(event_id) + REFERENCES event_journal(ordering) + ON DELETE CASCADE +); \ No newline at end of file diff --git a/core/src/main/resources/schema/postgres/postgres-create-schema.sql b/core/src/main/resources/schema/postgres/postgres-create-schema.sql index 58c10ad73..d8637cc22 100644 --- a/core/src/main/resources/schema/postgres/postgres-create-schema.sql +++ b/core/src/main/resources/schema/postgres/postgres-create-schema.sql @@ -1,4 +1,4 @@ -CREATE TABLE IF NOT EXISTS public.event_journal( +CREATE TABLE IF NOT EXISTS public.event_journal ( ordering BIGSERIAL, persistence_id VARCHAR(255) NOT NULL, sequence_number BIGINT NOT NULL, @@ -21,7 +21,7 @@ CREATE TABLE IF NOT EXISTS public.event_journal( CREATE UNIQUE INDEX event_journal_ordering_idx ON public.event_journal(ordering); -CREATE TABLE IF NOT EXISTS public.event_tag( +CREATE TABLE IF NOT EXISTS public.event_tags ( persistence_id VARCHAR(255) NOT NULL, sequence_number BIGINT NOT NULL, tag VARCHAR(256), diff --git a/core/src/main/resources/schema/sqlserver/sqlserver-create-schema-tags-legacy.sql b/core/src/main/resources/schema/sqlserver/sqlserver-create-schema-tags-legacy.sql new file mode 100644 index 000000000..36e08c97b --- /dev/null +++ b/core/src/main/resources/schema/sqlserver/sqlserver-create-schema-tags-legacy.sql @@ -0,0 +1,9 @@ +CREATE TABLE event_tag ( + "event_id" BIGINT NOT NULL, + "tag" NVARCHAR(255) NOT NULL + PRIMARY KEY ("event_id","tag") + constraint "fk_event_journal" + foreign key("event_id") + references "dbo"."event_journal"("ordering") + on delete CASCADE +); \ No newline at end of file diff --git a/core/src/main/resources/schema/sqlserver/sqlserver-create-schema.sql b/core/src/main/resources/schema/sqlserver/sqlserver-create-schema.sql index 9fcd70b22..5f015e399 100644 --- a/core/src/main/resources/schema/sqlserver/sqlserver-create-schema.sql +++ b/core/src/main/resources/schema/sqlserver/sqlserver-create-schema.sql @@ -1,4 +1,4 @@ -CREATE TABLE event_journal( +CREATE TABLE event_journal ( "ordering" BIGINT IDENTITY(1,1) NOT NULL, "deleted" BIT DEFAULT 0 NOT NULL, "persistence_id" NVARCHAR(255) NOT NULL, @@ -17,7 +17,7 @@ CREATE TABLE event_journal( CREATE UNIQUE INDEX event_journal_ordering_idx ON event_journal(ordering); -CREATE TABLE event_tag ( +CREATE TABLE event_tags ( "persistence_id" NVARCHAR(255) NOT NULL, "sequence_number" NUMERIC(10,0) NOT NULL, "tag" NVARCHAR(255) NOT NULL diff --git a/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala b/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala index 559b638bc..860cde7cc 100644 --- a/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala +++ b/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala @@ -57,6 +57,11 @@ class EventTagTableColumnNames(config: Config) { val sequenceNumber: String = cfg.getString("sequenceNumber") val tag: String = cfg.getString("tag") } +class LegacyEventTagTableColumnNames(config: Config) { + private val cfg = config.getConfig("tables.legacy_event_tag.columnNames") + val eventId: String = cfg.getString("eventId") + val tag: String = cfg.getString("tag") +} class LegacyJournalTableConfiguration(config: Config) { private val cfg = config.getConfig("tables.legacy_journal") @@ -75,9 +80,15 @@ class EventJournalTableConfiguration(config: Config) { } class EventTagTableConfiguration(config: Config) { private val cfg = config.getConfig("tables.event_tag") + private val legacyCfg = config.getConfig("tables.legacy_event_tag") + val writeLegacyTag: Boolean = config.getBoolean("tables.write-legacy-tag") + val readLegacyTag: Boolean = config.getBoolean("tables.read-legacy-tag") val tableName: String = cfg.getString("tableName") val schemaName: Option[String] = cfg.asStringOption("schemaName") val columnNames: EventTagTableColumnNames = new EventTagTableColumnNames(config) + val legacyTableName: String = legacyCfg.getString("tableName") + val legacySchemaName: Option[String] = legacyCfg.asStringOption("schemaName") + val legacyColumnNames: LegacyEventTagTableColumnNames = new LegacyEventTagTableColumnNames(config) } class LegacySnapshotTableColumnNames(config: Config) { private val cfg = config.getConfig("tables.legacy_snapshot.columnNames") diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala index 7b2a16248..52ae798cb 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala @@ -6,7 +6,7 @@ package akka.persistence.jdbc.journal.dao import akka.persistence.jdbc.config.{ EventJournalTableConfiguration, EventTagTableConfiguration } -import akka.persistence.jdbc.journal.dao.JournalTables.{ JournalAkkaSerializationRow, TagRow } +import akka.persistence.jdbc.journal.dao.JournalTables.{ JournalAkkaSerializationRow, LegacyTagRow, TagRow } import slick.jdbc.JdbcProfile import scala.concurrent.ExecutionContext @@ -20,25 +20,45 @@ class JournalQueries( import profile.api._ private val JournalTableC = Compiled(JournalTable) + private val insertAndReturn = JournalTable.returning(JournalTable.map(_.ordering)) private val TagTableC = Compiled(TagTable) + private val LegacyTagTableC = Compiled(LegacyTagTable) - def writeJournalRows(xs: Seq[(JournalAkkaSerializationRow, Set[String])])(implicit ec: ExecutionContext): DBIOAction[Any, NoStream, Effect.Write] = { + def writeJournalRows(xs: Seq[(JournalAkkaSerializationRow, Set[String])])( + implicit ec: ExecutionContext): DBIOAction[Any, NoStream, Effect.Write] = { val sorted = xs.sortBy((event => event._1.sequenceNumber)) - // not matter what, always insert event. - val events = sorted.map(_._1) if (sorted.exists(_._2.nonEmpty)) { // only if there are any tags - val tagsRows = sorted.flatMap(e => e._2.map(v => TagRow(e._1.persistenceId, e._1.sequenceNumber, v))) - for { - _ <- JournalTableC ++= events - _ <- TagTableC ++= tagsRows - } yield () + writeEventsAndTags(sorted) } else { // optimization avoid some work when not using tags + val events = sorted.map(_._1) JournalTableC ++= events } } + private def writeEventsAndTags(sorted: Seq[(JournalAkkaSerializationRow, Set[String])])( + implicit ec: ExecutionContext): DBIOAction[Any, NoStream, Effect.Write] = { + val (events, tags) = sorted.unzip + val tagRows = sorted.flatMap { case (eventRow, tags) => + tags.map(t => TagRow(eventRow.persistenceId, eventRow.sequenceNumber, t)) + } + if (tagTableCfg.writeLegacyTag) { + for { + ids <- insertAndReturn ++= events + tagInserts = ids.zip(tags).flatMap { case (id, tags) => tags.map(tag => LegacyTagRow(id, tag)) } + _ <- LegacyTagTableC ++= tagInserts + _ <- TagTableC ++= tagRows + } yield () + } else { + // optimization using batch insert + for { + _ <- JournalTableC ++= events + _ <- TagTableC ++= tagRows + } yield () + } + } + private def selectAllJournalForPersistenceIdDesc(persistenceId: Rep[String]) = selectAllJournalForPersistenceId(persistenceId).sortBy(_.sequenceNumber.desc) diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalTables.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalTables.scala index 1a3897bcf..d98e8261d 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalTables.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalTables.scala @@ -7,7 +7,7 @@ package akka.persistence.jdbc.journal.dao import akka.annotation.InternalApi import akka.persistence.jdbc.config.{ EventJournalTableConfiguration, EventTagTableConfiguration } -import akka.persistence.jdbc.journal.dao.JournalTables.{ JournalAkkaSerializationRow, TagRow } +import akka.persistence.jdbc.journal.dao.JournalTables.{ JournalAkkaSerializationRow, LegacyTagRow, TagRow } /** * INTERNAL API @@ -30,6 +30,7 @@ object JournalTables { metaSerManifest: Option[String]) case class TagRow(persistenceId: String, sequenceNumber: Long, tag: String) + case class LegacyTagRow(eventId: Long, tag: String) } /** @@ -44,7 +45,6 @@ trait JournalTables { def journalTableCfg: EventJournalTableConfiguration def tagTableCfg: EventTagTableConfiguration - class JournalEvents(_tableTag: Tag) extends Table[JournalAkkaSerializationRow]( _tableTag, @@ -104,4 +104,17 @@ trait JournalTables { } lazy val TagTable = new TableQuery(tag => new EventTags(tag)) + + class LegacyEventTags(_tableTag: Tag) + extends Table[LegacyTagRow](_tableTag, tagTableCfg.legacySchemaName, tagTableCfg.legacyTableName) { + override def * = (eventId, tag) <> (LegacyTagRow.tupled, LegacyTagRow.unapply) + + val eventId: Rep[Long] = column[Long](tagTableCfg.legacyColumnNames.eventId) + val tag: Rep[String] = column[String](tagTableCfg.legacyColumnNames.tag) + + val pk = primaryKey(s"${tagTableCfg.legacyTableName}_pk", (eventId, tag)) + val journalEvent = foreignKey(s"fk_legacy_${journalTableCfg.tableName}", eventId, JournalTable)(_.ordering) + } + + lazy val LegacyTagTable = new TableQuery(tag => new LegacyEventTags(tag)) } diff --git a/core/src/main/scala/akka/persistence/jdbc/query/dao/ReadJournalQueries.scala b/core/src/main/scala/akka/persistence/jdbc/query/dao/ReadJournalQueries.scala index b55a12aa5..c89a59b4f 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/dao/ReadJournalQueries.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/dao/ReadJournalQueries.scala @@ -28,7 +28,13 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo JournalTable.filter(_.deleted === false) private def baseTableWithTagsQuery() = { - baseTableQuery().join(TagTable).on((e, t) => e.persistenceId === t.persistenceId && e.sequenceNumber === t.sequenceNumber) + if (tagTableCfg.readLegacyTag) { + baseTableQuery().join(LegacyTagTable).on(_.ordering === _.eventId) + } else { + baseTableQuery() + .join(TagTable) + .on((e, t) => e.persistenceId === t.persistenceId && e.sequenceNumber === t.sequenceNumber) + } } val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct _) diff --git a/core/src/main/scala/akka/persistence/jdbc/testkit/internal/SchemaUtilsImpl.scala b/core/src/main/scala/akka/persistence/jdbc/testkit/internal/SchemaUtilsImpl.scala index 20916b71a..da7285edb 100644 --- a/core/src/main/scala/akka/persistence/jdbc/testkit/internal/SchemaUtilsImpl.scala +++ b/core/src/main/scala/akka/persistence/jdbc/testkit/internal/SchemaUtilsImpl.scala @@ -33,6 +33,10 @@ private[jdbc] object SchemaUtilsImpl { def legacy(configKey: String, config: Config): Boolean = config.getConfig(configKey).getString("dao") != "akka.persistence.jdbc.journal.dao.DefaultJournalDao" + // TODO How to gracefully create and drop legacy tags table. + def eventTagsLegacy(configKey: String, config: Config): Boolean = + config.getConfig(configKey).getBoolean("tables.use-legacy-tag") + /** * INTERNAL API */