Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial version of JDBC implementation of DurableState #544

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ target
.project
.worksheet
.bsp
*.code-workspace
.bloop
.metals
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package akka.persistence.jdbc.integration

import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateSpec
import akka.persistence.jdbc.testkit.internal.Postgres

class PostgresScalaJdbcDurableStateStoreQueryTest
extends JdbcDurableStateSpec(ConfigFactory.load("postgres-shared-db-application.conf"), Postgres) {
implicit lazy val system: ActorSystem =
ActorSystem("JdbcDurableStateSpec", config.withFallback(customSerializers))
}
122 changes: 121 additions & 1 deletion core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ jdbc-read-journal {
query-delay = 1 second
# The maximum backoff time before trying to query again in case of database failures
max-backoff-query-delay = 1 minute
# The ask timeout to use when querying the journal sequence actor, the actor should normally repond very quickly,
# The ask timeout to use when querying the journal sequence actor, the actor should normally respond very quickly,
# since it always replies with its current internal state
ask-timeout = 1 second
}
Expand Down Expand Up @@ -541,3 +541,123 @@ jdbc-read-journal {
}
}
}

# the akka-persistence-durable-state-store in use
jdbc-durable-state-store {
class = "akka.persistence.jdbc.state.scaladsl.DurableStateStore"

# number of records fetched from the store at once
batchSize = 500
# New states are retrieved (polled) with this interval.
refreshInterval = "1s"

tables {
state {
tableName = "state"
schemaName = ""
columnNames {
globalOffset = "global_offset"
persistenceId = "persistence_id"
statePayload = "state_payload"
tag = "tag"
revision = "revision"
stateSerId = "state_serial_id"
stateSerManifest = "state_serial_manifest"
stateTimestamp = "state_timestamp"
}
}
}

# Settings for determining if ids (ordering column) in the durable-state are out of sequence.
durable-state-sequence-retrieval {
# The maximum number of ids that will be retrieved in each batch
batch-size = 10000
# In case a number in the sequence is missing, this is the ammount of retries that will be done to see
# if the number is still found. Note that the time after which a number in the sequence is assumed missing is
# equal to maxTries * queryDelay
# (maxTries may not be zero)
max-tries = 10
# How often the actor will query for new data
query-delay = 1 second
# The maximum backoff time before trying to query again in case of database failures
max-backoff-query-delay = 1 minute
# The ask timeout to use when querying the durable-state sequence actor, the actor should normally respond very quickly,
# since it always replies with its current internal state
ask-timeout = 1 second
}

slick {

# This property indicates which profile must be used by Slick.
# Possible values are:
# - slick.jdbc.PostgresProfile$
# - slick.jdbc.MySQLProfile$
# - slick.jdbc.H2Profile$
# - slick.jdbc.SQLServerProfile$
# - slick.jdbc.OracleProfile$
# (uncomment and set the property below to match your needs)
# profile = "slick.jdbc.PostgresProfile$"

db {
connectionPool = "HikariCP"

# The JDBC URL for the chosen database
# (uncomment and set the property below to match your needs)
# url = "jdbc:postgresql://localhost:5432/akka-plugin"

# The database username
# (uncomment and set the property below to match your needs)
# user = "akka-plugin"

# The username's password
# (uncomment and set the property below to match your needs)
# password = "akka-plugin"

# The JDBC driver to use
# (uncomment and set the property below to match your needs)
# driver = "org.postgresql.Driver"

# hikariCP settings; see: https://github.com/brettwooldridge/HikariCP
# Slick will use an async executor with a fixed size queue of 10.000 objects
# The async executor is a connection pool for asynchronous execution of blocking I/O actions.
# This is used for the asynchronous query execution API on top of blocking back-ends like JDBC.
queueSize = 10000 // number of objects that can be queued by the async exector

# This property controls the maximum number of milliseconds that a client (that's you) will wait for a connection
# from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown.
# 1000ms is the minimum value. Default: 180000 (3 minutes)
connectionTimeout = 180000

# This property controls the maximum amount of time that a connection will be tested for aliveness.
# This value must be less than the connectionTimeout. The lowest accepted validation timeout is 1000ms (1 second). Default: 5000
validationTimeout = 5000

# 10 minutes: This property controls the maximum amount of time that a connection is allowed to sit idle in the pool.
# Whether a connection is retired as idle or not is subject to a maximum variation of +30 seconds, and average variation
# of +15 seconds. A connection will never be retired as idle before this timeout. A value of 0 means that idle connections
# are never removed from the pool. Default: 600000 (10 minutes)
idleTimeout = 600000

# 30 minutes: This property controls the maximum lifetime of a connection in the pool. When a connection reaches this timeout
# it will be retired from the pool, subject to a maximum variation of +30 seconds. An in-use connection will never be retired,
# only when it is closed will it then be removed. We strongly recommend setting this value, and it should be at least 30 seconds
# less than any database-level connection timeout. A value of 0 indicates no maximum lifetime (infinite lifetime),
# subject of course to the idleTimeout setting. Default: 1800000 (30 minutes)
maxLifetime = 1800000

# This property controls the amount of time that a connection can be out of the pool before a message is logged indicating a
# possible connection leak. A value of 0 means leak detection is disabled.
# Lowest acceptable value for enabling leak detection is 2000 (2 secs). Default: 0
leakDetectionThreshold = 0

# ensures that the database does not get dropped while we are using it
keepAliveConnection = on

# See some tips on thread/connection pool sizing on https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing
# Keep in mind that the number of threads must equal the maximum number of connections.
numThreads = 20
maxConnections = 20
minConnections = 20
}
}
}
14 changes: 14 additions & 0 deletions core/src/main/resources/schema/h2/h2-create-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,17 @@ CREATE TABLE IF NOT EXISTS "snapshot" (
PRIMARY KEY("persistence_id","sequence_number")
);

CREATE TABLE IF NOT EXISTS "state" (
"global_offset" BIGINT NOT NULL AUTO_INCREMENT,
"persistence_id" VARCHAR(255) NOT NULL,
"revision" BIGINT NOT NULL,
"state_payload" BLOB NOT NULL,
"state_serial_id" INTEGER NOT NULL,
"state_serial_manifest" VARCHAR,
"tag" VARCHAR,
"state_timestamp" BIGINT NOT NULL,
PRIMARY KEY("persistence_id")
);
CREATE INDEX "state_tag_idx" on "state" ("tag");
Comment on lines +49 to +53
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the discussion about dynamic projections we said that we should mark each row in the event journal with the type_hint from the entity that originated the event.

As already discussed in akka/akka, the reason to have only one tag is that the usage here is to identify the type of the state, so one can query all durable states of some type.

So, maybe a better option here is to follow the type hint proposal.



1 change: 1 addition & 0 deletions core/src/main/resources/schema/h2/h2-drop-schema.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
DROP TABLE IF EXISTS PUBLIC."event_tag";
DROP TABLE IF EXISTS PUBLIC."event_journal";
DROP TABLE IF EXISTS PUBLIC."snapshot";
DROP TABLE IF EXISTS PUBLIC."state";
13 changes: 13 additions & 0 deletions core/src/main/resources/schema/postgres/postgres-create-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,16 @@ CREATE TABLE IF NOT EXISTS public.snapshot (
PRIMARY KEY(persistence_id, sequence_number)
);

CREATE TABLE IF NOT EXISTS "state" (
"global_offset" BIGSERIAL,
"persistence_id" VARCHAR(255) NOT NULL,
"revision" BIGINT NOT NULL,
"state_payload" BYTEA NOT NULL,
"state_serial_id" INTEGER NOT NULL,
"state_serial_manifest" VARCHAR(255),
"tag" VARCHAR,
"state_timestamp" BIGINT NOT NULL,
PRIMARY KEY("persistence_id")
);
CREATE INDEX CONCURRENTLY "state_tag_idx" on "state" ("tag");
CREATE INDEX CONCURRENTLY "state_global_offset_idx" on "state" ("global_offset");
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
DROP TABLE IF EXISTS public.event_tag;
DROP TABLE IF EXISTS public.event_journal;
DROP TABLE IF EXISTS public.snapshot;
DROP TABLE IF EXISTS public.state;

Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
* Copyright (C) 2019 - 2021 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.jdbc.journal.dao
package akka.persistence.jdbc

import akka.annotation.InternalApi
import akka.persistence.PersistentRepr
import akka.persistence.jdbc.state.DurableStateTables
import akka.persistence.jdbc.journal.dao.JournalTables.JournalAkkaSerializationRow
import akka.serialization.{ Serialization, Serializers }

Expand Down Expand Up @@ -57,4 +58,8 @@ object AkkaSerialization {
} yield (withMeta.withTimestamp(row.writeTimestamp), row.ordering)
}
}

def fromDurableStateRow(serialization: Serialization)(row: DurableStateTables.DurableStateRow): Try[AnyRef] = {
serialization.deserialize(row.statePayload, row.stateSerId, row.stateSerManifest.getOrElse(""))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,42 @@ class ReadJournalConfig(config: Config) {
override def toString: String =
s"ReadJournalConfig($journalTableConfiguration,$pluginConfig,$refreshInterval,$maxBufferSize,$addShutdownHook,$includeDeleted)"
}

class DurableStateTableColumnNames(config: Config) {
private val cfg = config.getConfig("tables.state.columnNames")
val globalOffset: String = cfg.getString("globalOffset")
val persistenceId: String = cfg.getString("persistenceId")
val statePayload: String = cfg.getString("statePayload")
val tag: String = cfg.getString("tag")
val revision: String = cfg.getString("revision")
val stateSerId: String = cfg.getString("stateSerId")
val stateSerManifest: String = cfg.getString("stateSerManifest")
val stateTimestamp: String = cfg.getString("stateTimestamp")
}

class DurableStateTableConfiguration(config: Config) {
private val cfg = config.getConfig("tables.state")
val refreshInterval: FiniteDuration = config.asFiniteDuration("refreshInterval")
val batchSize: Int = config.getInt("batchSize")
val tableName: String = cfg.getString("tableName")
val schemaName: Option[String] = cfg.asStringOption("schemaName")
val columnNames: DurableStateTableColumnNames = new DurableStateTableColumnNames(config)
val stateSequenceConfig = DurableStateSequenceRetrievalConfig(config)
override def toString: String = s"DurableStateTableConfiguration($tableName,$schemaName,$columnNames)"
}

object DurableStateSequenceRetrievalConfig {
def apply(config: Config): DurableStateSequenceRetrievalConfig =
DurableStateSequenceRetrievalConfig(
batchSize = config.getInt("durable-state-sequence-retrieval.batch-size"),
maxTries = config.getInt("durable-state-sequence-retrieval.max-tries"),
queryDelay = config.asFiniteDuration("durable-state-sequence-retrieval.query-delay"),
maxBackoffQueryDelay = config.asFiniteDuration("durable-state-sequence-retrieval.max-backoff-query-delay"),
askTimeout = config.asFiniteDuration("durable-state-sequence-retrieval.ask-timeout"))
}
case class DurableStateSequenceRetrievalConfig(
batchSize: Int,
maxTries: Int,
queryDelay: FiniteDuration,
maxBackoffQueryDelay: FiniteDuration,
askTimeout: FiniteDuration)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package akka.persistence.jdbc.journal.dao

import akka.NotUsed
import akka.persistence.jdbc.AkkaSerialization
import akka.persistence.jdbc.config.{ BaseDaoConfig, JournalConfig }
import akka.persistence.jdbc.journal.dao.JournalTables.JournalAkkaSerializationRow
import akka.persistence.journal.Tagged
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
package akka.persistence.jdbc.query.dao
import akka.NotUsed
import akka.persistence.PersistentRepr
import akka.persistence.jdbc.AkkaSerialization
import akka.persistence.jdbc.config.ReadJournalConfig
import akka.persistence.jdbc.journal.dao.{ AkkaSerialization, BaseJournalDaoWithReadMessages, H2Compat }
import akka.persistence.jdbc.journal.dao.{ BaseJournalDaoWithReadMessages, H2Compat }
import akka.serialization.Serialization
import akka.stream.Materializer
import akka.stream.scaladsl.Source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import akka.serialization.Serialization
import akka.stream.Materializer
import SnapshotTables._
import akka.dispatch.ExecutionContexts
import akka.persistence.jdbc.journal.dao.AkkaSerialization
import akka.persistence.jdbc.AkkaSerialization

import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Success, Try }
Expand Down
Loading