diff --git a/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala index 1ea3dc5b1..d1f852eef 100644 --- a/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala +++ b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala @@ -28,8 +28,8 @@ class DurableStateQueries(val profile: JdbcProfile, override val durableStateTab } lazy val sequenceNextValUpdater = slickProfileToSchemaType(profile) match { - case "H2" => new H2SequenceNextValUpdater(profile) - case "Postgres" => new PostgresSequenceNextValUpdater(profile) + case "H2" => new H2SequenceNextValUpdater(profile, durableStateTableCfg) + case "Postgres" => new PostgresSequenceNextValUpdater(profile, durableStateTableCfg) case _ => ??? } @@ -41,18 +41,16 @@ class DurableStateQueries(val profile: JdbcProfile, override val durableStateTab durableStateTable.filter(_.persistenceId === persistenceId) private[jdbc] def insertDbWithDurableState(row: DurableStateTables.DurableStateRow, seqNextValue: String) = { - // FIXME: read the table name and column names from durableStateTableCfg - // https://github.com/akka/akka-persistence-jdbc/issues/573 - sqlu"""INSERT INTO durable_state + sqlu"""INSERT INTO #${durableStateTableCfg.tableName} ( - persistence_id, - global_offset, - revision, - state_payload, - state_serial_id, - state_serial_manifest, - tag, - state_timestamp + #${durableStateTableCfg.columnNames.persistenceId}, + #${durableStateTableCfg.columnNames.globalOffset}, + #${durableStateTableCfg.columnNames.revision}, + #${durableStateTableCfg.columnNames.statePayload}, + #${durableStateTableCfg.columnNames.stateSerId}, + #${durableStateTableCfg.columnNames.stateSerManifest}, + #${durableStateTableCfg.columnNames.tag}, + #${durableStateTableCfg.columnNames.stateTimestamp} ) VALUES ( @@ -69,18 +67,16 @@ class DurableStateQueries(val profile: JdbcProfile, override val durableStateTab } private[jdbc] def updateDbWithDurableState(row: DurableStateTables.DurableStateRow, seqNextValue: String) = { - // FIXME: read the table name and column names from durableStateTableCfg - // https://github.com/akka/akka-persistence-jdbc/issues/573 - sqlu"""UPDATE durable_state - SET global_offset = #${seqNextValue}, - revision = ${row.revision}, - state_payload = ${row.statePayload}, - state_serial_id = ${row.stateSerId}, - state_serial_manifest = ${row.stateSerManifest}, - tag = ${row.tag}, - state_timestamp = ${System.currentTimeMillis} - WHERE persistence_id = ${row.persistenceId} - AND revision = ${row.revision} - 1 + sqlu"""UPDATE #${durableStateTableCfg.tableName} + SET #${durableStateTableCfg.columnNames.globalOffset} = #${seqNextValue}, + #${durableStateTableCfg.columnNames.revision} = ${row.revision}, + #${durableStateTableCfg.columnNames.statePayload} = ${row.statePayload}, + #${durableStateTableCfg.columnNames.stateSerId} = ${row.stateSerId}, + #${durableStateTableCfg.columnNames.stateSerManifest} = ${row.stateSerManifest}, + #${durableStateTableCfg.columnNames.tag} = ${row.tag}, + #${durableStateTableCfg.columnNames.stateTimestamp} = ${System.currentTimeMillis} + WHERE #${durableStateTableCfg.columnNames.persistenceId} = ${row.persistenceId} + AND #${durableStateTableCfg.columnNames.revision} = ${row.revision} - 1 """ } diff --git a/core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala b/core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala index 96adf5dfe..b29491b69 100644 --- a/core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala +++ b/core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala @@ -5,6 +5,7 @@ package akka.persistence.jdbc.state +import akka.persistence.jdbc.config.DurableStateTableConfiguration import slick.jdbc.JdbcProfile import slick.dbio.Effect import slick.sql.SqlStreamingAction @@ -13,26 +14,25 @@ private[jdbc] trait SequenceNextValUpdater { def getSequenceNextValueExpr(): SqlStreamingAction[Vector[String], String, Effect] } -class H2SequenceNextValUpdater(profile: JdbcProfile) extends SequenceNextValUpdater { +class H2SequenceNextValUpdater(profile: JdbcProfile, val durableStateTableCfg: DurableStateTableConfiguration) + extends SequenceNextValUpdater { import profile.api._ // H2 dependent (https://stackoverflow.com/questions/36244641/h2-equivalent-of-postgres-serial-or-bigserial-column) - // FIXME: read the table name and column names from durableStateTableCfg - // https://github.com/akka/akka-persistence-jdbc/issues/573 def getSequenceNextValueExpr() = { sql"""SELECT COLUMN_DEFAULT FROM INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_NAME = 'durable_state' - AND COLUMN_NAME = 'global_offset' + WHERE TABLE_NAME = '#${durableStateTableCfg.tableName}' + AND COLUMN_NAME = '#${durableStateTableCfg.columnNames.globalOffset}' AND TABLE_SCHEMA = 'PUBLIC'""".as[String] } } -class PostgresSequenceNextValUpdater(profile: JdbcProfile) extends SequenceNextValUpdater { +class PostgresSequenceNextValUpdater(profile: JdbcProfile, val durableStateTableCfg: DurableStateTableConfiguration) + extends SequenceNextValUpdater { import profile.api._ - // FIXME: read the table name and column names from durableStateTableCfg - // https://github.com/akka/akka-persistence-jdbc/issues/573 - final val nextValFetcher = s"""(SELECT nextval(pg_get_serial_sequence('durable_state', 'global_offset')))""" + final val nextValFetcher = + s"""(SELECT nextval(pg_get_serial_sequence('${durableStateTableCfg.tableName}', '${durableStateTableCfg.columnNames.globalOffset}')))""" def getSequenceNextValueExpr() = sql"""#$nextValFetcher""".as[String] }