Skip to content

Commit

Permalink
Merge pull request #572 from akka/rename-to-durable-state
Browse files Browse the repository at this point in the history
  • Loading branch information
ignasi35 authored Jul 29, 2021
2 parents 3143f77 + 0843f0d commit 8fddf2b
Show file tree
Hide file tree
Showing 12 changed files with 60 additions and 34 deletions.
11 changes: 7 additions & 4 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -552,17 +552,20 @@ jdbc-durable-state-store {
refreshInterval = "1s"

tables {
state {
tableName = "state"
durable_state {
## The table and column names are not always read and used in SQL statements. If you change
## these values you may need to edit some source code
## https://github.com/akka/akka-persistence-jdbc/issues/573
tableName = "durable_state"
schemaName = ""
columnNames {
globalOffset = "global_offset"
persistenceId = "persistence_id"
statePayload = "state_payload"
tag = "tag"
revision = "revision"
statePayload = "state_payload"
stateSerId = "state_serial_id"
stateSerManifest = "state_serial_manifest"
tag = "tag"
stateTimestamp = "state_timestamp"
}
}
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/resources/schema/h2/h2-create-schema-legacy.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,19 @@ CREATE TABLE IF NOT EXISTS PUBLIC."snapshot" (
"snapshot" BYTEA NOT NULL,
PRIMARY KEY("persistence_id", "sequence_number")
);


CREATE TABLE IF NOT EXISTS "durable_state" (
"global_offset" BIGINT NOT NULL AUTO_INCREMENT,
"persistence_id" VARCHAR(255) NOT NULL,
"revision" BIGINT NOT NULL,
"state_payload" BLOB NOT NULL,
"state_serial_id" INTEGER NOT NULL,
"state_serial_manifest" VARCHAR,
"tag" VARCHAR,
"state_timestamp" BIGINT NOT NULL,
PRIMARY KEY("persistence_id")
);

CREATE INDEX "state_tag_idx" on "durable_state" ("tag");
CREATE INDEX "state_global_offset_idx" on "durable_state" ("global_offset");
7 changes: 3 additions & 4 deletions core/src/main/resources/schema/h2/h2-create-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ CREATE TABLE IF NOT EXISTS "snapshot" (
PRIMARY KEY("persistence_id","sequence_number")
);

CREATE TABLE IF NOT EXISTS "state" (
CREATE TABLE IF NOT EXISTS "durable_state" (
"global_offset" BIGINT NOT NULL AUTO_INCREMENT,
"persistence_id" VARCHAR(255) NOT NULL,
"revision" BIGINT NOT NULL,
Expand All @@ -50,6 +50,5 @@ CREATE TABLE IF NOT EXISTS "state" (
"state_timestamp" BIGINT NOT NULL,
PRIMARY KEY("persistence_id")
);
CREATE INDEX "state_tag_idx" on "state" ("tag");


CREATE INDEX "state_tag_idx" on "durable_state" ("tag");
CREATE INDEX "state_global_offset_idx" on "durable_state" ("global_offset");
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
DROP TABLE IF EXISTS PUBLIC."journal";
DROP TABLE IF EXISTS PUBLIC."snapshot";
DROP TABLE IF EXISTS PUBLIC."durable_state";
2 changes: 1 addition & 1 deletion core/src/main/resources/schema/h2/h2-drop-schema.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
DROP TABLE IF EXISTS PUBLIC."event_tag";
DROP TABLE IF EXISTS PUBLIC."event_journal";
DROP TABLE IF EXISTS PUBLIC."snapshot";
DROP TABLE IF EXISTS PUBLIC."state";
DROP TABLE IF EXISTS PUBLIC."durable_state";
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ CREATE TABLE IF NOT EXISTS public.snapshot (
PRIMARY KEY(persistence_id, sequence_number)
);

CREATE TABLE IF NOT EXISTS public.state (
CREATE TABLE IF NOT EXISTS public.durable_state (
global_offset BIGSERIAL,
persistence_id VARCHAR(255) NOT NULL,
revision BIGINT NOT NULL,
Expand All @@ -28,5 +28,5 @@ CREATE TABLE IF NOT EXISTS public.state (
state_timestamp BIGINT NOT NULL,
PRIMARY KEY(persistence_id)
);
CREATE INDEX CONCURRENTLY state_tag_idx on public.state (tag);
CREATE INDEX CONCURRENTLY state_global_offset_idx on public.state (global_offset);
CREATE INDEX CONCURRENTLY state_tag_idx on public.durable_state (tag);
CREATE INDEX CONCURRENTLY state_global_offset_idx on public.durable_state (global_offset);
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ CREATE TABLE IF NOT EXISTS public.snapshot (
PRIMARY KEY(persistence_id, sequence_number)
);

CREATE TABLE IF NOT EXISTS public.state (
CREATE TABLE IF NOT EXISTS public.durable_state (
global_offset BIGSERIAL,
persistence_id VARCHAR(255) NOT NULL,
revision BIGINT NOT NULL,
Expand All @@ -58,5 +58,5 @@ CREATE TABLE IF NOT EXISTS public.state (
state_timestamp BIGINT NOT NULL,
PRIMARY KEY(persistence_id)
);
CREATE INDEX CONCURRENTLY state_tag_idx on public.state (tag);
CREATE INDEX CONCURRENTLY state_global_offset_idx on public.state (global_offset);
CREATE INDEX CONCURRENTLY state_tag_idx on public.durable_state (tag);
CREATE INDEX CONCURRENTLY state_global_offset_idx on public.durable_state (global_offset);
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
DROP TABLE IF EXISTS public.journal;
DROP TABLE IF EXISTS public.snapshot;
DROP TABLE IF EXISTS public.state;
DROP TABLE IF EXISTS public.durable_state;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
DROP TABLE IF EXISTS public.event_tag;
DROP TABLE IF EXISTS public.event_journal;
DROP TABLE IF EXISTS public.snapshot;
DROP TABLE IF EXISTS public.state;
DROP TABLE IF EXISTS public.durable_state;

Original file line number Diff line number Diff line change
Expand Up @@ -196,22 +196,22 @@ class ReadJournalConfig(config: Config) {
}

class DurableStateTableColumnNames(config: Config) {
private val cfg = config.getConfig("tables.state.columnNames")
private val cfg = config.getConfig("tables.durable_state.columnNames")
val globalOffset: String = cfg.getString("globalOffset")
val persistenceId: String = cfg.getString("persistenceId")
val statePayload: String = cfg.getString("statePayload")
val tag: String = cfg.getString("tag")
val revision: String = cfg.getString("revision")
val statePayload: String = cfg.getString("statePayload")
val stateSerId: String = cfg.getString("stateSerId")
val stateSerManifest: String = cfg.getString("stateSerManifest")
val tag: String = cfg.getString("tag")
val stateTimestamp: String = cfg.getString("stateTimestamp")
}

class DurableStateTableConfiguration(config: Config) {
private val cfg = config.getConfig("tables.state")
private val cfg = config.getConfig("tables.durable_state")
val tableName: String = cfg.getString("tableName")
val refreshInterval: FiniteDuration = config.asFiniteDuration("refreshInterval")
val batchSize: Int = config.getInt("batchSize")
val tableName: String = cfg.getString("tableName")
val schemaName: Option[String] = cfg.asStringOption("schemaName")
val columnNames: DurableStateTableColumnNames = new DurableStateTableColumnNames(config)
val stateSequenceConfig = DurableStateSequenceRetrievalConfig(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,17 @@ class DurableStateQueries(val profile: JdbcProfile, override val durableStateTab
durableStateTable.filter(_.persistenceId === persistenceId)

private[jdbc] def insertDbWithDurableState(row: DurableStateTables.DurableStateRow, seqNextValue: String) = {

sqlu"""INSERT INTO state
// FIXME: read the table name and column names from durableStateTableCfg
// https://github.com/akka/akka-persistence-jdbc/issues/573
sqlu"""INSERT INTO durable_state
(
persistence_id,
persistence_id,
global_offset,
revision,
state_payload,
state_serial_id,
state_serial_manifest,
tag,
revision,
state_payload,
state_serial_id,
state_serial_manifest,
tag,
state_timestamp
)
VALUES
Expand All @@ -68,9 +69,11 @@ class DurableStateQueries(val profile: JdbcProfile, override val durableStateTab
}

private[jdbc] def updateDbWithDurableState(row: DurableStateTables.DurableStateRow, seqNextValue: String) = {
sqlu"""UPDATE state
// FIXME: read the table name and column names from durableStateTableCfg
// https://github.com/akka/akka-persistence-jdbc/issues/573
sqlu"""UPDATE durable_state
SET global_offset = #${seqNextValue},
revision = ${row.revision},
revision = ${row.revision},
state_payload = ${row.statePayload},
state_serial_id = ${row.stateSerId},
state_serial_manifest = ${row.stateSerManifest},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@ class H2SequenceNextValUpdater(profile: JdbcProfile) extends SequenceNextValUpda
import profile.api._

// H2 dependent (https://stackoverflow.com/questions/36244641/h2-equivalent-of-postgres-serial-or-bigserial-column)
// FIXME: read the table name and column names from durableStateTableCfg
// https://github.com/akka/akka-persistence-jdbc/issues/573
def getSequenceNextValueExpr() = {
sql"""SELECT COLUMN_DEFAULT
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'state'
WHERE TABLE_NAME = 'durable_state'
AND COLUMN_NAME = 'global_offset'
AND TABLE_SCHEMA = 'PUBLIC'""".as[String]
}
}

class PostgresSequenceNextValUpdater(profile: JdbcProfile) extends SequenceNextValUpdater {
import profile.api._
final val nextValFetcher = s"""(SELECT nextval(pg_get_serial_sequence('state', 'global_offset')))"""
// FIXME: read the table name and column names from durableStateTableCfg
// https://github.com/akka/akka-persistence-jdbc/issues/573
final val nextValFetcher = s"""(SELECT nextval(pg_get_serial_sequence('durable_state', 'global_offset')))"""

def getSequenceNextValueExpr() = sql"""#$nextValFetcher""".as[String]
}

0 comments on commit 8fddf2b

Please sign in to comment.