From 1b3b3bec42bbab55994d925327b1276c84d94d31 Mon Sep 17 00:00:00 2001 From: Ignasi Marimon-Clos Date: Thu, 29 Jul 2021 12:22:47 +0200 Subject: [PATCH 1/2] Durable state DDL is all configurable --- .../jdbc/state/DurableStateQueries.scala | 46 +++++++++---------- .../jdbc/state/SequenceNextValUpdater.scala | 15 +++--- 2 files changed, 27 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala index 1ea3dc5b1..d1f852eef 100644 --- a/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala +++ b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala @@ -28,8 +28,8 @@ class DurableStateQueries(val profile: JdbcProfile, override val durableStateTab } lazy val sequenceNextValUpdater = slickProfileToSchemaType(profile) match { - case "H2" => new H2SequenceNextValUpdater(profile) - case "Postgres" => new PostgresSequenceNextValUpdater(profile) + case "H2" => new H2SequenceNextValUpdater(profile, durableStateTableCfg) + case "Postgres" => new PostgresSequenceNextValUpdater(profile, durableStateTableCfg) case _ => ??? } @@ -41,18 +41,16 @@ class DurableStateQueries(val profile: JdbcProfile, override val durableStateTab durableStateTable.filter(_.persistenceId === persistenceId) private[jdbc] def insertDbWithDurableState(row: DurableStateTables.DurableStateRow, seqNextValue: String) = { - // FIXME: read the table name and column names from durableStateTableCfg - // https://github.com/akka/akka-persistence-jdbc/issues/573 - sqlu"""INSERT INTO durable_state + sqlu"""INSERT INTO #${durableStateTableCfg.tableName} ( - persistence_id, - global_offset, - revision, - state_payload, - state_serial_id, - state_serial_manifest, - tag, - state_timestamp + #${durableStateTableCfg.columnNames.persistenceId}, + #${durableStateTableCfg.columnNames.globalOffset}, + #${durableStateTableCfg.columnNames.revision}, + #${durableStateTableCfg.columnNames.statePayload}, + #${durableStateTableCfg.columnNames.stateSerId}, + #${durableStateTableCfg.columnNames.stateSerManifest}, + #${durableStateTableCfg.columnNames.tag}, + #${durableStateTableCfg.columnNames.stateTimestamp} ) VALUES ( @@ -69,18 +67,16 @@ class DurableStateQueries(val profile: JdbcProfile, override val durableStateTab } private[jdbc] def updateDbWithDurableState(row: DurableStateTables.DurableStateRow, seqNextValue: String) = { - // FIXME: read the table name and column names from durableStateTableCfg - // https://github.com/akka/akka-persistence-jdbc/issues/573 - sqlu"""UPDATE durable_state - SET global_offset = #${seqNextValue}, - revision = ${row.revision}, - state_payload = ${row.statePayload}, - state_serial_id = ${row.stateSerId}, - state_serial_manifest = ${row.stateSerManifest}, - tag = ${row.tag}, - state_timestamp = ${System.currentTimeMillis} - WHERE persistence_id = ${row.persistenceId} - AND revision = ${row.revision} - 1 + sqlu"""UPDATE #${durableStateTableCfg.tableName} + SET #${durableStateTableCfg.columnNames.globalOffset} = #${seqNextValue}, + #${durableStateTableCfg.columnNames.revision} = ${row.revision}, + #${durableStateTableCfg.columnNames.statePayload} = ${row.statePayload}, + #${durableStateTableCfg.columnNames.stateSerId} = ${row.stateSerId}, + #${durableStateTableCfg.columnNames.stateSerManifest} = ${row.stateSerManifest}, + #${durableStateTableCfg.columnNames.tag} = ${row.tag}, + #${durableStateTableCfg.columnNames.stateTimestamp} = ${System.currentTimeMillis} + WHERE #${durableStateTableCfg.columnNames.persistenceId} = ${row.persistenceId} + AND #${durableStateTableCfg.columnNames.revision} = ${row.revision} - 1 """ } diff --git a/core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala b/core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala index 96adf5dfe..fe5aae65b 100644 --- a/core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala +++ b/core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala @@ -5,6 +5,7 @@ package akka.persistence.jdbc.state +import akka.persistence.jdbc.config.DurableStateTableConfiguration import slick.jdbc.JdbcProfile import slick.dbio.Effect import slick.sql.SqlStreamingAction @@ -13,26 +14,22 @@ private[jdbc] trait SequenceNextValUpdater { def getSequenceNextValueExpr(): SqlStreamingAction[Vector[String], String, Effect] } -class H2SequenceNextValUpdater(profile: JdbcProfile) extends SequenceNextValUpdater { +class H2SequenceNextValUpdater(profile: JdbcProfile, val durableStateTableCfg: DurableStateTableConfiguration) extends SequenceNextValUpdater { import profile.api._ // H2 dependent (https://stackoverflow.com/questions/36244641/h2-equivalent-of-postgres-serial-or-bigserial-column) - // FIXME: read the table name and column names from durableStateTableCfg - // https://github.com/akka/akka-persistence-jdbc/issues/573 def getSequenceNextValueExpr() = { sql"""SELECT COLUMN_DEFAULT FROM INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_NAME = 'durable_state' - AND COLUMN_NAME = 'global_offset' + WHERE TABLE_NAME = '#${durableStateTableCfg.tableName}' + AND COLUMN_NAME = '#${durableStateTableCfg.columnNames.globalOffset}' AND TABLE_SCHEMA = 'PUBLIC'""".as[String] } } -class PostgresSequenceNextValUpdater(profile: JdbcProfile) extends SequenceNextValUpdater { +class PostgresSequenceNextValUpdater(profile: JdbcProfile, val durableStateTableCfg: DurableStateTableConfiguration) extends SequenceNextValUpdater { import profile.api._ - // FIXME: read the table name and column names from durableStateTableCfg - // https://github.com/akka/akka-persistence-jdbc/issues/573 - final val nextValFetcher = s"""(SELECT nextval(pg_get_serial_sequence('durable_state', 'global_offset')))""" + final val nextValFetcher = s"""(SELECT nextval(pg_get_serial_sequence('${durableStateTableCfg.tableName}', '${durableStateTableCfg.columnNames.globalOffset}')))""" def getSequenceNextValueExpr() = sql"""#$nextValFetcher""".as[String] } From a83e91d0e854ba26a0cff140378a8feeb5331700 Mon Sep 17 00:00:00 2001 From: Ignasi Marimon-Clos Date: Thu, 29 Jul 2021 12:27:39 +0200 Subject: [PATCH 2/2] scalafmt --- .../persistence/jdbc/state/SequenceNextValUpdater.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala b/core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala index fe5aae65b..b29491b69 100644 --- a/core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala +++ b/core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala @@ -14,7 +14,8 @@ private[jdbc] trait SequenceNextValUpdater { def getSequenceNextValueExpr(): SqlStreamingAction[Vector[String], String, Effect] } -class H2SequenceNextValUpdater(profile: JdbcProfile, val durableStateTableCfg: DurableStateTableConfiguration) extends SequenceNextValUpdater { +class H2SequenceNextValUpdater(profile: JdbcProfile, val durableStateTableCfg: DurableStateTableConfiguration) + extends SequenceNextValUpdater { import profile.api._ // H2 dependent (https://stackoverflow.com/questions/36244641/h2-equivalent-of-postgres-serial-or-bigserial-column) @@ -27,9 +28,11 @@ class H2SequenceNextValUpdater(profile: JdbcProfile, val durableStateTableCfg: D } } -class PostgresSequenceNextValUpdater(profile: JdbcProfile, val durableStateTableCfg: DurableStateTableConfiguration) extends SequenceNextValUpdater { +class PostgresSequenceNextValUpdater(profile: JdbcProfile, val durableStateTableCfg: DurableStateTableConfiguration) + extends SequenceNextValUpdater { import profile.api._ - final val nextValFetcher = s"""(SELECT nextval(pg_get_serial_sequence('${durableStateTableCfg.tableName}', '${durableStateTableCfg.columnNames.globalOffset}')))""" + final val nextValFetcher = + s"""(SELECT nextval(pg_get_serial_sequence('${durableStateTableCfg.tableName}', '${durableStateTableCfg.columnNames.globalOffset}')))""" def getSequenceNextValueExpr() = sql"""#$nextValFetcher""".as[String] }