Skip to content


Initial implementation of jdbc based DurableStateStore and query
Browse files Browse the repository at this point in the history
  • Loading branch information
debasishg committed Jul 1, 2021
1 parent 61d5648 commit 3bf7b7b
Show file tree
Hide file tree
Showing 30 changed files with 1,780 additions and 5 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ target
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package akka.persistence.jdbc.integration

import com.typesafe.config.ConfigFactory
import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateSpec
import akka.persistence.jdbc.testkit.internal.Postgres

class PostgresScalaJdbcDurableStateStoreQueryTest
extends JdbcDurableStateSpec(ConfigFactory.load("postgres-shared-db-application.conf"), Postgres) {
implicit lazy val system: ActorSystem =
ActorSystem("JdbcDurableStateSpec", config.withFallback(customSerializers))
122 changes: 121 additions & 1 deletion core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ jdbc-read-journal {
query-delay = 1 second
# The maximum backoff time before trying to query again in case of database failures
max-backoff-query-delay = 1 minute
# The ask timeout to use when querying the journal sequence actor, the actor should normally repond very quickly,
# The ask timeout to use when querying the journal sequence actor, the actor should normally respond very quickly,
# since it always replies with its current internal state
ask-timeout = 1 second
Expand Down Expand Up @@ -541,3 +541,123 @@ jdbc-read-journal {

# the akka-persistence-durable-state-store in use
jdbc-durable-state-store {
class = "akka.persistence.jdbc.state.scaladsl.DurableStateStore"

# number of records fetched from the store at once
batchSize = 500
# New states are retrieved (polled) with this interval.
refreshInterval = "1s"

tables {
state {
tableName = "state"
schemaName = ""
columnNames {
globalOffset = "global_offset"
persistenceId = "persistence_id"
statePayload = "state_payload"
tag = "tag"
revision = "revision"
stateSerId = "state_serial_id"
stateSerManifest = "state_serial_manifest"
stateTimestamp = "state_timestamp"

# Settings for determining if ids (ordering column) in the durable-state are out of sequence.
durable-state-sequence-retrieval {
# The maximum number of ids that will be retrieved in each batch
batch-size = 10000
# In case a number in the sequence is missing, this is the ammount of retries that will be done to see
# if the number is still found. Note that the time after which a number in the sequence is assumed missing is
# equal to maxTries * queryDelay
# (maxTries may not be zero)
max-tries = 10
# How often the actor will query for new data
query-delay = 1 second
# The maximum backoff time before trying to query again in case of database failures
max-backoff-query-delay = 1 minute
# The ask timeout to use when querying the durable-state sequence actor, the actor should normally respond very quickly,
# since it always replies with its current internal state
ask-timeout = 1 second

slick {

# This property indicates which profile must be used by Slick.
# Possible values are:
# - slick.jdbc.PostgresProfile$
# - slick.jdbc.MySQLProfile$
# - slick.jdbc.H2Profile$
# - slick.jdbc.SQLServerProfile$
# - slick.jdbc.OracleProfile$
# (uncomment and set the property below to match your needs)
# profile = "slick.jdbc.PostgresProfile$"

db {
connectionPool = "HikariCP"

# The JDBC URL for the chosen database
# (uncomment and set the property below to match your needs)
# url = "jdbc:postgresql://localhost:5432/akka-plugin"

# The database username
# (uncomment and set the property below to match your needs)
# user = "akka-plugin"

# The username's password
# (uncomment and set the property below to match your needs)
# password = "akka-plugin"

# The JDBC driver to use
# (uncomment and set the property below to match your needs)
# driver = "org.postgresql.Driver"

# hikariCP settings; see:
# Slick will use an async executor with a fixed size queue of 10.000 objects
# The async executor is a connection pool for asynchronous execution of blocking I/O actions.
# This is used for the asynchronous query execution API on top of blocking back-ends like JDBC.
queueSize = 10000 // number of objects that can be queued by the async exector

# This property controls the maximum number of milliseconds that a client (that's you) will wait for a connection
# from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown.
# 1000ms is the minimum value. Default: 180000 (3 minutes)
connectionTimeout = 180000

# This property controls the maximum amount of time that a connection will be tested for aliveness.
# This value must be less than the connectionTimeout. The lowest accepted validation timeout is 1000ms (1 second). Default: 5000
validationTimeout = 5000

# 10 minutes: This property controls the maximum amount of time that a connection is allowed to sit idle in the pool.
# Whether a connection is retired as idle or not is subject to a maximum variation of +30 seconds, and average variation
# of +15 seconds. A connection will never be retired as idle before this timeout. A value of 0 means that idle connections
# are never removed from the pool. Default: 600000 (10 minutes)
idleTimeout = 600000

# 30 minutes: This property controls the maximum lifetime of a connection in the pool. When a connection reaches this timeout
# it will be retired from the pool, subject to a maximum variation of +30 seconds. An in-use connection will never be retired,
# only when it is closed will it then be removed. We strongly recommend setting this value, and it should be at least 30 seconds
# less than any database-level connection timeout. A value of 0 indicates no maximum lifetime (infinite lifetime),
# subject of course to the idleTimeout setting. Default: 1800000 (30 minutes)
maxLifetime = 1800000

# This property controls the amount of time that a connection can be out of the pool before a message is logged indicating a
# possible connection leak. A value of 0 means leak detection is disabled.
# Lowest acceptable value for enabling leak detection is 2000 (2 secs). Default: 0
leakDetectionThreshold = 0

# ensures that the database does not get dropped while we are using it
keepAliveConnection = on

# See some tips on thread/connection pool sizing on
# Keep in mind that the number of threads must equal the maximum number of connections.
numThreads = 20
maxConnections = 20
minConnections = 20
14 changes: 14 additions & 0 deletions core/src/main/resources/schema/h2/h2-create-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,17 @@ CREATE TABLE IF NOT EXISTS "snapshot" (
PRIMARY KEY("persistence_id","sequence_number")

"persistence_id" VARCHAR(255) NOT NULL,
"revision" BIGINT NOT NULL,
"state_payload" BLOB NOT NULL,
"state_serial_id" INTEGER NOT NULL,
"state_serial_manifest" VARCHAR,
"tag" VARCHAR,
"state_timestamp" BIGINT NOT NULL,
PRIMARY KEY("persistence_id")
CREATE INDEX "state_tag_idx" on "state" ("tag");

1 change: 1 addition & 0 deletions core/src/main/resources/schema/h2/h2-drop-schema.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
13 changes: 13 additions & 0 deletions core/src/main/resources/schema/postgres/postgres-create-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,16 @@ CREATE TABLE IF NOT EXISTS public.snapshot (
PRIMARY KEY(persistence_id, sequence_number)

"global_offset" BIGSERIAL,
"persistence_id" VARCHAR(255) NOT NULL,
"revision" BIGINT NOT NULL,
"state_payload" BYTEA NOT NULL,
"state_serial_id" INTEGER NOT NULL,
"state_serial_manifest" VARCHAR(255),
"tag" VARCHAR,
"state_timestamp" BIGINT NOT NULL,
PRIMARY KEY("persistence_id")
CREATE INDEX CONCURRENTLY "state_tag_idx" on "state" ("tag");
CREATE INDEX CONCURRENTLY "state_global_offset_idx" on "state" ("global_offset");
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
DROP TABLE IF EXISTS public.event_tag;
DROP TABLE IF EXISTS public.event_journal;
DROP TABLE IF EXISTS public.snapshot;
DROP TABLE IF EXISTS public.state;

Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
* Copyright (C) 2019 - 2021 Lightbend Inc. <>

package akka.persistence.jdbc.journal.dao
package akka.persistence.jdbc

import akka.annotation.InternalApi
import akka.persistence.PersistentRepr
import akka.persistence.jdbc.state.DurableStateTables
import akka.persistence.jdbc.journal.dao.JournalTables.JournalAkkaSerializationRow
import akka.serialization.{ Serialization, Serializers }

Expand Down Expand Up @@ -57,4 +58,8 @@ object AkkaSerialization {
} yield (withMeta.withTimestamp(row.writeTimestamp), row.ordering)

def fromDurableStateRow(serialization: Serialization)(row: DurableStateTables.DurableStateRow): Try[AnyRef] = {
serialization.deserialize(row.statePayload, row.stateSerId, row.stateSerManifest.getOrElse(""))
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,42 @@ class ReadJournalConfig(config: Config) {
override def toString: String =

class DurableStateTableColumnNames(config: Config) {
private val cfg = config.getConfig("tables.state.columnNames")
val globalOffset: String = cfg.getString("globalOffset")
val persistenceId: String = cfg.getString("persistenceId")
val statePayload: String = cfg.getString("statePayload")
val tag: String = cfg.getString("tag")
val revision: String = cfg.getString("revision")
val stateSerId: String = cfg.getString("stateSerId")
val stateSerManifest: String = cfg.getString("stateSerManifest")
val stateTimestamp: String = cfg.getString("stateTimestamp")

class DurableStateTableConfiguration(config: Config) {
private val cfg = config.getConfig("tables.state")
val refreshInterval: FiniteDuration = config.asFiniteDuration("refreshInterval")
val batchSize: Int = config.getInt("batchSize")
val tableName: String = cfg.getString("tableName")
val schemaName: Option[String] = cfg.asStringOption("schemaName")
val columnNames: DurableStateTableColumnNames = new DurableStateTableColumnNames(config)
val stateSequenceConfig = DurableStateSequenceRetrievalConfig(config)
override def toString: String = s"DurableStateTableConfiguration($tableName,$schemaName,$columnNames)"

object DurableStateSequenceRetrievalConfig {
def apply(config: Config): DurableStateSequenceRetrievalConfig =
batchSize = config.getInt("durable-state-sequence-retrieval.batch-size"),
maxTries = config.getInt("durable-state-sequence-retrieval.max-tries"),
queryDelay = config.asFiniteDuration("durable-state-sequence-retrieval.query-delay"),
maxBackoffQueryDelay = config.asFiniteDuration("durable-state-sequence-retrieval.max-backoff-query-delay"),
askTimeout = config.asFiniteDuration("durable-state-sequence-retrieval.ask-timeout"))
case class DurableStateSequenceRetrievalConfig(
batchSize: Int,
maxTries: Int,
queryDelay: FiniteDuration,
maxBackoffQueryDelay: FiniteDuration,
askTimeout: FiniteDuration)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package akka.persistence.jdbc.journal.dao

import akka.NotUsed
import akka.persistence.jdbc.AkkaSerialization
import akka.persistence.jdbc.config.{ BaseDaoConfig, JournalConfig }
import akka.persistence.jdbc.journal.dao.JournalTables.JournalAkkaSerializationRow
import akka.persistence.journal.Tagged
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
package akka.persistence.jdbc.query.dao
import akka.NotUsed
import akka.persistence.PersistentRepr
import akka.persistence.jdbc.AkkaSerialization
import akka.persistence.jdbc.config.ReadJournalConfig
import akka.persistence.jdbc.journal.dao.{ AkkaSerialization, BaseJournalDaoWithReadMessages, H2Compat }
import akka.persistence.jdbc.journal.dao.{ BaseJournalDaoWithReadMessages, H2Compat }
import akka.serialization.Serialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import akka.serialization.Serialization
import SnapshotTables._
import akka.dispatch.ExecutionContexts
import akka.persistence.jdbc.journal.dao.AkkaSerialization
import akka.persistence.jdbc.AkkaSerialization

import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Success, Try }
Expand Down

0 comments on commit 3bf7b7b

Please sign in to comment.