Skip to content

Commit

Permalink
Merge pull request #575 from akka/use-configurable-table-names
Browse files Browse the repository at this point in the history
Durable state DDL is all configurable
  • Loading branch information
ignasi35 authored Jul 29, 2021
2 parents 8fddf2b + a83e91d commit 60b649b
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ class DurableStateQueries(val profile: JdbcProfile, override val durableStateTab
}

lazy val sequenceNextValUpdater = slickProfileToSchemaType(profile) match {
case "H2" => new H2SequenceNextValUpdater(profile)
case "Postgres" => new PostgresSequenceNextValUpdater(profile)
case "H2" => new H2SequenceNextValUpdater(profile, durableStateTableCfg)
case "Postgres" => new PostgresSequenceNextValUpdater(profile, durableStateTableCfg)
case _ => ???
}

Expand All @@ -41,18 +41,16 @@ class DurableStateQueries(val profile: JdbcProfile, override val durableStateTab
durableStateTable.filter(_.persistenceId === persistenceId)

private[jdbc] def insertDbWithDurableState(row: DurableStateTables.DurableStateRow, seqNextValue: String) = {
// FIXME: read the table name and column names from durableStateTableCfg
// https://github.com/akka/akka-persistence-jdbc/issues/573
sqlu"""INSERT INTO durable_state
sqlu"""INSERT INTO #${durableStateTableCfg.tableName}
(
persistence_id,
global_offset,
revision,
state_payload,
state_serial_id,
state_serial_manifest,
tag,
state_timestamp
#${durableStateTableCfg.columnNames.persistenceId},
#${durableStateTableCfg.columnNames.globalOffset},
#${durableStateTableCfg.columnNames.revision},
#${durableStateTableCfg.columnNames.statePayload},
#${durableStateTableCfg.columnNames.stateSerId},
#${durableStateTableCfg.columnNames.stateSerManifest},
#${durableStateTableCfg.columnNames.tag},
#${durableStateTableCfg.columnNames.stateTimestamp}
)
VALUES
(
Expand All @@ -69,18 +67,16 @@ class DurableStateQueries(val profile: JdbcProfile, override val durableStateTab
}

private[jdbc] def updateDbWithDurableState(row: DurableStateTables.DurableStateRow, seqNextValue: String) = {
// FIXME: read the table name and column names from durableStateTableCfg
// https://github.com/akka/akka-persistence-jdbc/issues/573
sqlu"""UPDATE durable_state
SET global_offset = #${seqNextValue},
revision = ${row.revision},
state_payload = ${row.statePayload},
state_serial_id = ${row.stateSerId},
state_serial_manifest = ${row.stateSerManifest},
tag = ${row.tag},
state_timestamp = ${System.currentTimeMillis}
WHERE persistence_id = ${row.persistenceId}
AND revision = ${row.revision} - 1
sqlu"""UPDATE #${durableStateTableCfg.tableName}
SET #${durableStateTableCfg.columnNames.globalOffset} = #${seqNextValue},
#${durableStateTableCfg.columnNames.revision} = ${row.revision},
#${durableStateTableCfg.columnNames.statePayload} = ${row.statePayload},
#${durableStateTableCfg.columnNames.stateSerId} = ${row.stateSerId},
#${durableStateTableCfg.columnNames.stateSerManifest} = ${row.stateSerManifest},
#${durableStateTableCfg.columnNames.tag} = ${row.tag},
#${durableStateTableCfg.columnNames.stateTimestamp} = ${System.currentTimeMillis}
WHERE #${durableStateTableCfg.columnNames.persistenceId} = ${row.persistenceId}
AND #${durableStateTableCfg.columnNames.revision} = ${row.revision} - 1
"""
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package akka.persistence.jdbc.state

import akka.persistence.jdbc.config.DurableStateTableConfiguration
import slick.jdbc.JdbcProfile
import slick.dbio.Effect
import slick.sql.SqlStreamingAction
Expand All @@ -13,26 +14,25 @@ private[jdbc] trait SequenceNextValUpdater {
def getSequenceNextValueExpr(): SqlStreamingAction[Vector[String], String, Effect]
}

class H2SequenceNextValUpdater(profile: JdbcProfile) extends SequenceNextValUpdater {
class H2SequenceNextValUpdater(profile: JdbcProfile, val durableStateTableCfg: DurableStateTableConfiguration)
extends SequenceNextValUpdater {
import profile.api._

// H2 dependent (https://stackoverflow.com/questions/36244641/h2-equivalent-of-postgres-serial-or-bigserial-column)
// FIXME: read the table name and column names from durableStateTableCfg
// https://github.com/akka/akka-persistence-jdbc/issues/573
def getSequenceNextValueExpr() = {
sql"""SELECT COLUMN_DEFAULT
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'durable_state'
AND COLUMN_NAME = 'global_offset'
WHERE TABLE_NAME = '#${durableStateTableCfg.tableName}'
AND COLUMN_NAME = '#${durableStateTableCfg.columnNames.globalOffset}'
AND TABLE_SCHEMA = 'PUBLIC'""".as[String]
}
}

class PostgresSequenceNextValUpdater(profile: JdbcProfile) extends SequenceNextValUpdater {
class PostgresSequenceNextValUpdater(profile: JdbcProfile, val durableStateTableCfg: DurableStateTableConfiguration)
extends SequenceNextValUpdater {
import profile.api._
// FIXME: read the table name and column names from durableStateTableCfg
// https://github.com/akka/akka-persistence-jdbc/issues/573
final val nextValFetcher = s"""(SELECT nextval(pg_get_serial_sequence('durable_state', 'global_offset')))"""
final val nextValFetcher =
s"""(SELECT nextval(pg_get_serial_sequence('${durableStateTableCfg.tableName}', '${durableStateTableCfg.columnNames.globalOffset}')))"""

def getSequenceNextValueExpr() = sql"""#$nextValFetcher""".as[String]
}

0 comments on commit 60b649b

Please sign in to comment.