From 3bf7b7b32ff16b784f78790544c4a16e901d84b1 Mon Sep 17 00:00:00 2001 From: Debasish Ghosh Date: Mon, 7 Jun 2021 21:55:09 +0530 Subject: [PATCH] Initial implementation of jdbc based DurableStateStore and query --- .gitignore | 3 + ...calaJdbcDurableStateChangesByTagTest.scala | 12 + core/src/main/resources/reference.conf | 122 +++++- .../resources/schema/h2/h2-create-schema.sql | 14 + .../resources/schema/h2/h2-drop-schema.sql | 1 + .../postgres/postgres-create-schema.sql | 13 + .../schema/postgres/postgres-drop-schema.sql | 1 + .../{journal/dao => }/AkkaSerialization.scala | 7 +- .../jdbc/config/AkkaPersistenceConfig.scala | 39 ++ .../jdbc/journal/dao/DefaultJournalDao.scala | 1 + .../query/dao/DefaultReadJournalDao.scala | 3 +- .../snapshot/dao/DefaultSnapshotDao.scala | 2 +- .../jdbc/state/DurableStateQueries.scala | 116 ++++++ .../state/DurableStateStoreProvider.scala | 37 ++ .../jdbc/state/DurableStateTables.scala | 53 +++ .../persistence/jdbc/state/OffsetOps.scala | 20 + .../jdbc/state/SequenceNextValUpdater.scala | 34 ++ .../state/javadsl/JdbcDurableStateStore.scala | 47 +++ .../scaladsl/DurableStateSequenceActor.scala | 186 +++++++++ .../scaladsl/JdbcDurableStateStore.scala | 215 ++++++++++ core/src/test/resources/h2-application.conf | 5 + .../test/resources/postgres-application.conf | 5 + .../postgres-shared-db-application.conf | 5 + .../persistence/jdbc/state/Payloads.scala | 28 ++ .../state/scaladsl/DataGenerationHelper.scala | 62 +++ .../DurableStateSequenceActorTest.scala | 188 +++++++++ .../state/scaladsl/JdbcDurableStateSpec.scala | 393 ++++++++++++++++++ .../jdbc/state/scaladsl/StateSpecBase.scala | 120 ++++++ .../TestProbeDurableStateStoreQuery.scala | 51 +++ project/Dependencies.scala | 2 +- 30 files changed, 1780 insertions(+), 5 deletions(-) create mode 100644 core/src/it/scala/akka/persistence/jdbc/integration/PostgresScalaJdbcDurableStateChangesByTagTest.scala rename core/src/main/scala/akka/persistence/jdbc/{journal/dao => }/AkkaSerialization.scala (86%) create mode 100644 core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala create mode 100644 core/src/main/scala/akka/persistence/jdbc/state/DurableStateStoreProvider.scala create mode 100644 core/src/main/scala/akka/persistence/jdbc/state/DurableStateTables.scala create mode 100644 core/src/main/scala/akka/persistence/jdbc/state/OffsetOps.scala create mode 100644 core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala create mode 100644 core/src/main/scala/akka/persistence/jdbc/state/javadsl/JdbcDurableStateStore.scala create mode 100644 core/src/main/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActor.scala create mode 100644 core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala create mode 100644 core/src/test/scala/akka/persistence/jdbc/state/Payloads.scala create mode 100644 core/src/test/scala/akka/persistence/jdbc/state/scaladsl/DataGenerationHelper.scala create mode 100644 core/src/test/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActorTest.scala create mode 100644 core/src/test/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala create mode 100644 core/src/test/scala/akka/persistence/jdbc/state/scaladsl/StateSpecBase.scala create mode 100644 core/src/test/scala/akka/persistence/jdbc/state/scaladsl/TestProbeDurableStateStoreQuery.scala diff --git a/.gitignore b/.gitignore index 5049bad49..20ea198d9 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,6 @@ target .project .worksheet .bsp +*.code-workspace +.bloop +.metals diff --git a/core/src/it/scala/akka/persistence/jdbc/integration/PostgresScalaJdbcDurableStateChangesByTagTest.scala b/core/src/it/scala/akka/persistence/jdbc/integration/PostgresScalaJdbcDurableStateChangesByTagTest.scala new file mode 100644 index 000000000..80413597f --- /dev/null +++ b/core/src/it/scala/akka/persistence/jdbc/integration/PostgresScalaJdbcDurableStateChangesByTagTest.scala @@ -0,0 +1,12 @@ +package akka.persistence.jdbc.integration + +import com.typesafe.config.ConfigFactory +import akka.actor.ActorSystem +import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateSpec +import akka.persistence.jdbc.testkit.internal.Postgres + +class PostgresScalaJdbcDurableStateStoreQueryTest + extends JdbcDurableStateSpec(ConfigFactory.load("postgres-shared-db-application.conf"), Postgres) { + implicit lazy val system: ActorSystem = + ActorSystem("JdbcDurableStateSpec", config.withFallback(customSerializers)) +} \ No newline at end of file diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index cc9f02c78..4a0767f33 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -450,7 +450,7 @@ jdbc-read-journal { query-delay = 1 second # The maximum backoff time before trying to query again in case of database failures max-backoff-query-delay = 1 minute - # The ask timeout to use when querying the journal sequence actor, the actor should normally repond very quickly, + # The ask timeout to use when querying the journal sequence actor, the actor should normally respond very quickly, # since it always replies with its current internal state ask-timeout = 1 second } @@ -541,3 +541,123 @@ jdbc-read-journal { } } } + +# the akka-persistence-durable-state-store in use +jdbc-durable-state-store { + class = "akka.persistence.jdbc.state.scaladsl.DurableStateStore" + + # number of records fetched from the store at once + batchSize = 500 + # New states are retrieved (polled) with this interval. + refreshInterval = "1s" + + tables { + state { + tableName = "state" + schemaName = "" + columnNames { + globalOffset = "global_offset" + persistenceId = "persistence_id" + statePayload = "state_payload" + tag = "tag" + revision = "revision" + stateSerId = "state_serial_id" + stateSerManifest = "state_serial_manifest" + stateTimestamp = "state_timestamp" + } + } + } + + # Settings for determining if ids (ordering column) in the durable-state are out of sequence. + durable-state-sequence-retrieval { + # The maximum number of ids that will be retrieved in each batch + batch-size = 10000 + # In case a number in the sequence is missing, this is the ammount of retries that will be done to see + # if the number is still found. Note that the time after which a number in the sequence is assumed missing is + # equal to maxTries * queryDelay + # (maxTries may not be zero) + max-tries = 10 + # How often the actor will query for new data + query-delay = 1 second + # The maximum backoff time before trying to query again in case of database failures + max-backoff-query-delay = 1 minute + # The ask timeout to use when querying the durable-state sequence actor, the actor should normally respond very quickly, + # since it always replies with its current internal state + ask-timeout = 1 second + } + + slick { + + # This property indicates which profile must be used by Slick. + # Possible values are: + # - slick.jdbc.PostgresProfile$ + # - slick.jdbc.MySQLProfile$ + # - slick.jdbc.H2Profile$ + # - slick.jdbc.SQLServerProfile$ + # - slick.jdbc.OracleProfile$ + # (uncomment and set the property below to match your needs) + # profile = "slick.jdbc.PostgresProfile$" + + db { + connectionPool = "HikariCP" + + # The JDBC URL for the chosen database + # (uncomment and set the property below to match your needs) + # url = "jdbc:postgresql://localhost:5432/akka-plugin" + + # The database username + # (uncomment and set the property below to match your needs) + # user = "akka-plugin" + + # The username's password + # (uncomment and set the property below to match your needs) + # password = "akka-plugin" + + # The JDBC driver to use + # (uncomment and set the property below to match your needs) + # driver = "org.postgresql.Driver" + + # hikariCP settings; see: https://github.com/brettwooldridge/HikariCP + # Slick will use an async executor with a fixed size queue of 10.000 objects + # The async executor is a connection pool for asynchronous execution of blocking I/O actions. + # This is used for the asynchronous query execution API on top of blocking back-ends like JDBC. + queueSize = 10000 // number of objects that can be queued by the async exector + + # This property controls the maximum number of milliseconds that a client (that's you) will wait for a connection + # from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown. + # 1000ms is the minimum value. Default: 180000 (3 minutes) + connectionTimeout = 180000 + + # This property controls the maximum amount of time that a connection will be tested for aliveness. + # This value must be less than the connectionTimeout. The lowest accepted validation timeout is 1000ms (1 second). Default: 5000 + validationTimeout = 5000 + + # 10 minutes: This property controls the maximum amount of time that a connection is allowed to sit idle in the pool. + # Whether a connection is retired as idle or not is subject to a maximum variation of +30 seconds, and average variation + # of +15 seconds. A connection will never be retired as idle before this timeout. A value of 0 means that idle connections + # are never removed from the pool. Default: 600000 (10 minutes) + idleTimeout = 600000 + + # 30 minutes: This property controls the maximum lifetime of a connection in the pool. When a connection reaches this timeout + # it will be retired from the pool, subject to a maximum variation of +30 seconds. An in-use connection will never be retired, + # only when it is closed will it then be removed. We strongly recommend setting this value, and it should be at least 30 seconds + # less than any database-level connection timeout. A value of 0 indicates no maximum lifetime (infinite lifetime), + # subject of course to the idleTimeout setting. Default: 1800000 (30 minutes) + maxLifetime = 1800000 + + # This property controls the amount of time that a connection can be out of the pool before a message is logged indicating a + # possible connection leak. A value of 0 means leak detection is disabled. + # Lowest acceptable value for enabling leak detection is 2000 (2 secs). Default: 0 + leakDetectionThreshold = 0 + + # ensures that the database does not get dropped while we are using it + keepAliveConnection = on + + # See some tips on thread/connection pool sizing on https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing + # Keep in mind that the number of threads must equal the maximum number of connections. + numThreads = 20 + maxConnections = 20 + minConnections = 20 + } + } +} diff --git a/core/src/main/resources/schema/h2/h2-create-schema.sql b/core/src/main/resources/schema/h2/h2-create-schema.sql index 8e4441c9b..afda0a286 100644 --- a/core/src/main/resources/schema/h2/h2-create-schema.sql +++ b/core/src/main/resources/schema/h2/h2-create-schema.sql @@ -39,3 +39,17 @@ CREATE TABLE IF NOT EXISTS "snapshot" ( PRIMARY KEY("persistence_id","sequence_number") ); +CREATE TABLE IF NOT EXISTS "state" ( + "global_offset" BIGINT NOT NULL AUTO_INCREMENT, + "persistence_id" VARCHAR(255) NOT NULL, + "revision" BIGINT NOT NULL, + "state_payload" BLOB NOT NULL, + "state_serial_id" INTEGER NOT NULL, + "state_serial_manifest" VARCHAR, + "tag" VARCHAR, + "state_timestamp" BIGINT NOT NULL, + PRIMARY KEY("persistence_id") + ); +CREATE INDEX "state_tag_idx" on "state" ("tag"); + + diff --git a/core/src/main/resources/schema/h2/h2-drop-schema.sql b/core/src/main/resources/schema/h2/h2-drop-schema.sql index 4a7fdd708..b18d74de7 100644 --- a/core/src/main/resources/schema/h2/h2-drop-schema.sql +++ b/core/src/main/resources/schema/h2/h2-drop-schema.sql @@ -1,3 +1,4 @@ DROP TABLE IF EXISTS PUBLIC."event_tag"; DROP TABLE IF EXISTS PUBLIC."event_journal"; DROP TABLE IF EXISTS PUBLIC."snapshot"; +DROP TABLE IF EXISTS PUBLIC."state"; diff --git a/core/src/main/resources/schema/postgres/postgres-create-schema.sql b/core/src/main/resources/schema/postgres/postgres-create-schema.sql index ddf129ae4..3bc0eeca7 100644 --- a/core/src/main/resources/schema/postgres/postgres-create-schema.sql +++ b/core/src/main/resources/schema/postgres/postgres-create-schema.sql @@ -47,3 +47,16 @@ CREATE TABLE IF NOT EXISTS public.snapshot ( PRIMARY KEY(persistence_id, sequence_number) ); +CREATE TABLE IF NOT EXISTS "state" ( + "global_offset" BIGSERIAL, + "persistence_id" VARCHAR(255) NOT NULL, + "revision" BIGINT NOT NULL, + "state_payload" BYTEA NOT NULL, + "state_serial_id" INTEGER NOT NULL, + "state_serial_manifest" VARCHAR(255), + "tag" VARCHAR, + "state_timestamp" BIGINT NOT NULL, + PRIMARY KEY("persistence_id") + ); +CREATE INDEX CONCURRENTLY "state_tag_idx" on "state" ("tag"); +CREATE INDEX CONCURRENTLY "state_global_offset_idx" on "state" ("global_offset"); diff --git a/core/src/main/resources/schema/postgres/postgres-drop-schema.sql b/core/src/main/resources/schema/postgres/postgres-drop-schema.sql index 64154f80f..2aa11c1bf 100644 --- a/core/src/main/resources/schema/postgres/postgres-drop-schema.sql +++ b/core/src/main/resources/schema/postgres/postgres-drop-schema.sql @@ -1,4 +1,5 @@ DROP TABLE IF EXISTS public.event_tag; DROP TABLE IF EXISTS public.event_journal; DROP TABLE IF EXISTS public.snapshot; +DROP TABLE IF EXISTS public.state; diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/AkkaSerialization.scala b/core/src/main/scala/akka/persistence/jdbc/AkkaSerialization.scala similarity index 86% rename from core/src/main/scala/akka/persistence/jdbc/journal/dao/AkkaSerialization.scala rename to core/src/main/scala/akka/persistence/jdbc/AkkaSerialization.scala index 43954148b..2030999d9 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/AkkaSerialization.scala +++ b/core/src/main/scala/akka/persistence/jdbc/AkkaSerialization.scala @@ -3,10 +3,11 @@ * Copyright (C) 2019 - 2021 Lightbend Inc. */ -package akka.persistence.jdbc.journal.dao +package akka.persistence.jdbc import akka.annotation.InternalApi import akka.persistence.PersistentRepr +import akka.persistence.jdbc.state.DurableStateTables import akka.persistence.jdbc.journal.dao.JournalTables.JournalAkkaSerializationRow import akka.serialization.{ Serialization, Serializers } @@ -57,4 +58,8 @@ object AkkaSerialization { } yield (withMeta.withTimestamp(row.writeTimestamp), row.ordering) } } + + def fromDurableStateRow(serialization: Serialization)(row: DurableStateTables.DurableStateRow): Try[AnyRef] = { + serialization.deserialize(row.statePayload, row.stateSerId, row.stateSerManifest.getOrElse("")) + } } diff --git a/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala b/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala index 80c0ba876..79ffd4973 100644 --- a/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala +++ b/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala @@ -194,3 +194,42 @@ class ReadJournalConfig(config: Config) { override def toString: String = s"ReadJournalConfig($journalTableConfiguration,$pluginConfig,$refreshInterval,$maxBufferSize,$addShutdownHook,$includeDeleted)" } + +class DurableStateTableColumnNames(config: Config) { + private val cfg = config.getConfig("tables.state.columnNames") + val globalOffset: String = cfg.getString("globalOffset") + val persistenceId: String = cfg.getString("persistenceId") + val statePayload: String = cfg.getString("statePayload") + val tag: String = cfg.getString("tag") + val revision: String = cfg.getString("revision") + val stateSerId: String = cfg.getString("stateSerId") + val stateSerManifest: String = cfg.getString("stateSerManifest") + val stateTimestamp: String = cfg.getString("stateTimestamp") +} + +class DurableStateTableConfiguration(config: Config) { + private val cfg = config.getConfig("tables.state") + val refreshInterval: FiniteDuration = config.asFiniteDuration("refreshInterval") + val batchSize: Int = config.getInt("batchSize") + val tableName: String = cfg.getString("tableName") + val schemaName: Option[String] = cfg.asStringOption("schemaName") + val columnNames: DurableStateTableColumnNames = new DurableStateTableColumnNames(config) + val stateSequenceConfig = DurableStateSequenceRetrievalConfig(config) + override def toString: String = s"DurableStateTableConfiguration($tableName,$schemaName,$columnNames)" +} + +object DurableStateSequenceRetrievalConfig { + def apply(config: Config): DurableStateSequenceRetrievalConfig = + DurableStateSequenceRetrievalConfig( + batchSize = config.getInt("durable-state-sequence-retrieval.batch-size"), + maxTries = config.getInt("durable-state-sequence-retrieval.max-tries"), + queryDelay = config.asFiniteDuration("durable-state-sequence-retrieval.query-delay"), + maxBackoffQueryDelay = config.asFiniteDuration("durable-state-sequence-retrieval.max-backoff-query-delay"), + askTimeout = config.asFiniteDuration("durable-state-sequence-retrieval.ask-timeout")) +} +case class DurableStateSequenceRetrievalConfig( + batchSize: Int, + maxTries: Int, + queryDelay: FiniteDuration, + maxBackoffQueryDelay: FiniteDuration, + askTimeout: FiniteDuration) diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala index d6fc9f369..1f2421678 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala @@ -6,6 +6,7 @@ package akka.persistence.jdbc.journal.dao import akka.NotUsed +import akka.persistence.jdbc.AkkaSerialization import akka.persistence.jdbc.config.{ BaseDaoConfig, JournalConfig } import akka.persistence.jdbc.journal.dao.JournalTables.JournalAkkaSerializationRow import akka.persistence.journal.Tagged diff --git a/core/src/main/scala/akka/persistence/jdbc/query/dao/DefaultReadJournalDao.scala b/core/src/main/scala/akka/persistence/jdbc/query/dao/DefaultReadJournalDao.scala index 26fbeb797..c4a70efed 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/dao/DefaultReadJournalDao.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/dao/DefaultReadJournalDao.scala @@ -6,8 +6,9 @@ package akka.persistence.jdbc.query.dao import akka.NotUsed import akka.persistence.PersistentRepr +import akka.persistence.jdbc.AkkaSerialization import akka.persistence.jdbc.config.ReadJournalConfig -import akka.persistence.jdbc.journal.dao.{ AkkaSerialization, BaseJournalDaoWithReadMessages, H2Compat } +import akka.persistence.jdbc.journal.dao.{ BaseJournalDaoWithReadMessages, H2Compat } import akka.serialization.Serialization import akka.stream.Materializer import akka.stream.scaladsl.Source diff --git a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/DefaultSnapshotDao.scala b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/DefaultSnapshotDao.scala index 70e197adf..502b41c18 100644 --- a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/DefaultSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/DefaultSnapshotDao.scala @@ -12,7 +12,7 @@ import akka.serialization.Serialization import akka.stream.Materializer import SnapshotTables._ import akka.dispatch.ExecutionContexts -import akka.persistence.jdbc.journal.dao.AkkaSerialization +import akka.persistence.jdbc.AkkaSerialization import scala.concurrent.{ ExecutionContext, Future } import scala.util.{ Success, Try } diff --git a/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala new file mode 100644 index 000000000..338e92b59 --- /dev/null +++ b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala @@ -0,0 +1,116 @@ +/* + * Copyright (C) 2014 - 2019 Dennis Vriend + * Copyright (C) 2019 - 2021 Lightbend Inc. + */ + +package akka.persistence.jdbc.state + +import slick.jdbc.{ JdbcProfile, SetParameter } +import slick.jdbc.H2Profile +import slick.jdbc.MySQLProfile +import slick.jdbc.OracleProfile +import slick.jdbc.PostgresProfile +import slick.jdbc.SQLServerProfile +import akka.persistence.jdbc.config.DurableStateTableConfiguration + +class DurableStateQueries(val profile: JdbcProfile, override val durableStateTableCfg: DurableStateTableConfiguration) + extends DurableStateTables { + import profile.api._ + + private def slickProfileToSchemaType(profile: JdbcProfile): String = + profile match { + case PostgresProfile => "Postgres" + case MySQLProfile => "MySQL" + case OracleProfile => "Oracle" + case SQLServerProfile => "SqlServer" + case H2Profile => "H2" + case _ => throw new IllegalArgumentException(s"Unknown JdbcProfile $profile encountered") + } + + lazy val sequenceNextValUpdater = slickProfileToSchemaType(profile) match { + case "H2" => new H2SequenceNextValUpdater(profile) + case "Postgres" => new PostgresSequenceNextValUpdater(profile) + case _ => ??? + } + + implicit val uuidSetter = SetParameter[Array[Byte]] { case (bytes, params) => + params.setBytes(bytes) + } + + private[jdbc] def selectFromDbByPersistenceId(persistenceId: Rep[String]) = + durableStateTable.filter(_.persistenceId === persistenceId) + + private[jdbc] def insertDbWithDurableState(row: DurableStateTables.DurableStateRow, seqNextValue: String) = { + + sqlu"""INSERT INTO state + ( + persistence_id, + global_offset, + revision, + state_payload, + state_serial_id, + state_serial_manifest, + tag, + state_timestamp + ) + VALUES + ( + ${row.persistenceId}, + #${seqNextValue}, + ${row.revision}, + ${row.statePayload}, + ${row.stateSerId}, + ${row.stateSerManifest}, + ${row.tag}, + #${System.currentTimeMillis()} + ) + """ + } + + private[jdbc] def updateDbWithDurableState(row: DurableStateTables.DurableStateRow, seqNextValue: String) = { + sqlu"""UPDATE state + SET global_offset = #${seqNextValue}, + revision = ${row.revision}, + state_payload = ${row.statePayload}, + state_serial_id = ${row.stateSerId}, + state_serial_manifest = ${row.stateSerManifest}, + tag = ${row.tag}, + state_timestamp = ${System.currentTimeMillis} + WHERE persistence_id = ${row.persistenceId} + AND revision = ${row.revision} - 1 + """ + } + + private[jdbc] def getSequenceNextValueExpr() = sequenceNextValUpdater.getSequenceNextValueExpr() + + def deleteFromDb(persistenceId: String) = { + durableStateTable.filter(_.persistenceId === persistenceId).delete + } + + def deleteAllFromDb() = { + durableStateTable.delete + } + + private[jdbc] val maxOffsetQuery = Compiled { + durableStateTable.map(_.globalOffset).max.getOrElse(0L) + } + + private def _changesByTag( + tag: Rep[String], + offset: ConstColumn[Long], + maxOffset: ConstColumn[Long], + max: ConstColumn[Long]) = { + durableStateTable + .filter(_.tag === tag) + .sortBy(_.globalOffset.asc) + .filter(row => row.globalOffset > offset && row.globalOffset <= maxOffset) + .take(max) + } + + private[jdbc] val changesByTag = Compiled(_changesByTag _) + + private def _stateStoreSequenceQuery(from: ConstColumn[Long], limit: ConstColumn[Long]) = + durableStateTable.filter(_.globalOffset > from).map(_.globalOffset).sorted.take(limit) + + val stateStoreSequenceQuery = Compiled(_stateStoreSequenceQuery _) +} diff --git a/core/src/main/scala/akka/persistence/jdbc/state/DurableStateStoreProvider.scala b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateStoreProvider.scala new file mode 100644 index 000000000..85d49b037 --- /dev/null +++ b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateStoreProvider.scala @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2014 - 2019 Dennis Vriend + * Copyright (C) 2019 - 2021 Lightbend Inc. + */ + +package akka.persistence.jdbc.state + +import slick.jdbc.{ JdbcBackend, JdbcProfile } +import akka.persistence.state.DurableStateStoreProvider +import akka.persistence.jdbc.config.DurableStateTableConfiguration +import akka.persistence.state.scaladsl.DurableStateStore +import akka.persistence.state.javadsl.{ DurableStateStore => JDurableStateStore } +import akka.serialization.Serialization +import akka.stream.{ Materializer, SystemMaterializer } +import akka.actor.ExtendedActorSystem + +import scala.concurrent.ExecutionContext + +class JdbcDurableStateStoreProvider[A]( + db: JdbcBackend#Database, + profile: JdbcProfile, + durableStateConfig: DurableStateTableConfiguration, + serialization: Serialization)(implicit val system: ExtendedActorSystem) + extends DurableStateStoreProvider { + + implicit val ec: ExecutionContext = system.dispatcher + implicit val mat: Materializer = SystemMaterializer(system).materializer + + override val scaladslDurableStateStore: DurableStateStore[Any] = + new scaladsl.JdbcDurableStateStore[Any](db, profile, durableStateConfig, serialization) + + override val javadslDurableStateStore: JDurableStateStore[AnyRef] = + new javadsl.JdbcDurableStateStore[AnyRef]( + profile, + durableStateConfig, + new scaladsl.JdbcDurableStateStore[AnyRef](db, profile, durableStateConfig, serialization)) +} diff --git a/core/src/main/scala/akka/persistence/jdbc/state/DurableStateTables.scala b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateTables.scala new file mode 100644 index 000000000..ed50a587a --- /dev/null +++ b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateTables.scala @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2014 - 2019 Dennis Vriend + * Copyright (C) 2019 - 2021 Lightbend Inc. + */ + +package akka.persistence.jdbc.state + +import akka.persistence.jdbc.config.DurableStateTableConfiguration + +object DurableStateTables { + case class DurableStateRow( + globalOffset: Long, + persistenceId: String, + revision: Long, + statePayload: Array[Byte], + tag: Option[String], + stateSerId: Int, + stateSerManifest: Option[String], + stateTimestamp: Long) +} + +trait DurableStateTables { + val profile: slick.jdbc.JdbcProfile + import profile.api._ + def durableStateTableCfg: DurableStateTableConfiguration + + import DurableStateTables._ + + class DurableState(_tableTag: Tag) + extends Table[DurableStateRow]( + _tableTag, + _schemaName = durableStateTableCfg.schemaName, + _tableName = durableStateTableCfg.tableName) { + + def * = + (globalOffset, persistenceId, revision, statePayload, tag, stateSerId, stateSerManifest, stateTimestamp) + .<>(DurableStateRow.tupled, DurableStateRow.unapply) + + val globalOffset: Rep[Long] = column[Long](durableStateTableCfg.columnNames.globalOffset, O.AutoInc) + val persistenceId: Rep[String] = + column[String](durableStateTableCfg.columnNames.persistenceId, O.PrimaryKey, O.Length(255, varying = true)) + val revision: Rep[Long] = column[Long](durableStateTableCfg.columnNames.revision) + val statePayload: Rep[Array[Byte]] = column[Array[Byte]](durableStateTableCfg.columnNames.statePayload) + val tag: Rep[Option[String]] = column[Option[String]](durableStateTableCfg.columnNames.tag) + val stateSerId: Rep[Int] = column[Int](durableStateTableCfg.columnNames.stateSerId) + val stateSerManifest: Rep[Option[String]] = + column[Option[String]](durableStateTableCfg.columnNames.stateSerManifest) + val stateTimestamp: Rep[Long] = column[Long](durableStateTableCfg.columnNames.stateTimestamp) + + val globalOffsetIdx = index(s"${tableName}_globalOffset_idx", globalOffset, unique = true) + } + lazy val durableStateTable = new TableQuery(new DurableState(_)) +} diff --git a/core/src/main/scala/akka/persistence/jdbc/state/OffsetOps.scala b/core/src/main/scala/akka/persistence/jdbc/state/OffsetOps.scala new file mode 100644 index 000000000..78999b0b2 --- /dev/null +++ b/core/src/main/scala/akka/persistence/jdbc/state/OffsetOps.scala @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2014 - 2019 Dennis Vriend + * Copyright (C) 2019 - 2021 Lightbend Inc. + */ + +package akka.persistence.jdbc.state + +import akka.persistence.query._ +object OffsetSyntax { + implicit class OffsetOps(val that: Offset) extends AnyVal { + def value = + that match { + case Sequence(offsetValue) => offsetValue + case NoOffset => 0L + case _ => + throw new IllegalArgumentException( + "akka-persistence-jdbc does not support " + that.getClass.getName + " offsets") + } + } +} diff --git a/core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala b/core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala new file mode 100644 index 000000000..113dd2d52 --- /dev/null +++ b/core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2014 - 2019 Dennis Vriend + * Copyright (C) 2019 - 2021 Lightbend Inc. + */ + +package akka.persistence.jdbc.state + +import slick.jdbc.JdbcProfile +import slick.dbio.Effect +import slick.sql.SqlStreamingAction + +private[jdbc] trait SequenceNextValUpdater { + def getSequenceNextValueExpr(): SqlStreamingAction[Vector[String], String, Effect] +} + +class H2SequenceNextValUpdater(profile: JdbcProfile) extends SequenceNextValUpdater { + import profile.api._ + + // H2 dependent (https://stackoverflow.com/questions/36244641/h2-equivalent-of-postgres-serial-or-bigserial-column) + def getSequenceNextValueExpr() = { + sql"""SELECT COLUMN_DEFAULT + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_NAME = 'state' + AND COLUMN_NAME = 'global_offset' + AND TABLE_SCHEMA = 'PUBLIC'""".as[String] + } +} + +class PostgresSequenceNextValUpdater(profile: JdbcProfile) extends SequenceNextValUpdater { + import profile.api._ + final val nextValFetcher = s"""(SELECT nextval(pg_get_serial_sequence('state', 'global_offset')))""" + + def getSequenceNextValueExpr() = sql"""#$nextValFetcher""".as[String] +} diff --git a/core/src/main/scala/akka/persistence/jdbc/state/javadsl/JdbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/jdbc/state/javadsl/JdbcDurableStateStore.scala new file mode 100644 index 000000000..720f4744f --- /dev/null +++ b/core/src/main/scala/akka/persistence/jdbc/state/javadsl/JdbcDurableStateStore.scala @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2014 - 2019 Dennis Vriend + * Copyright (C) 2019 - 2021 Lightbend Inc. + */ + +package akka.persistence.jdbc.state.javadsl + +import java.util.Optional +import java.util.concurrent.CompletionStage +import scala.compat.java8.FutureConverters._ +import scala.concurrent.ExecutionContext +import slick.jdbc.JdbcProfile +import akka.{ Done, NotUsed } +import akka.persistence.state.javadsl.{ DurableStateUpdateStore, GetObjectResult } +import akka.persistence.jdbc.state.DurableStateQueries +import akka.persistence.jdbc.config.DurableStateTableConfiguration +import akka.persistence.query.{ DurableStateChange, Offset } +import akka.persistence.query.javadsl.DurableStateStoreQuery +import akka.stream.javadsl.Source + +class JdbcDurableStateStore[A]( + profile: JdbcProfile, + durableStateConfig: DurableStateTableConfiguration, + scalaStore: akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore[A])(implicit ec: ExecutionContext) + extends DurableStateUpdateStore[A] + with DurableStateStoreQuery[A] { + + val queries = new DurableStateQueries(profile, durableStateConfig) + + def getObject(persistenceId: String): CompletionStage[GetObjectResult[A]] = + toJava( + scalaStore + .getObject(persistenceId) + .map(x => GetObjectResult(Optional.ofNullable(x.value.getOrElse(null.asInstanceOf[A])), x.revision))) + + def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): CompletionStage[Done] = + toJava(scalaStore.upsertObject(persistenceId, revision, value, tag)) + + def deleteObject(persistenceId: String): CompletionStage[Done] = + toJava(scalaStore.deleteObject(persistenceId)) + + def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed] = + scalaStore.currentChanges(tag, offset).asJava + + def changes(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed] = + scalaStore.changes(tag, offset).asJava +} diff --git a/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActor.scala b/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActor.scala new file mode 100644 index 000000000..31f1d63fc --- /dev/null +++ b/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActor.scala @@ -0,0 +1,186 @@ +/* + * Copyright (C) 2014 - 2019 Dennis Vriend + * Copyright (C) 2019 - 2021 Lightbend Inc. + */ + +package akka.persistence.jdbc.state.scaladsl + +import akka.actor.{ Actor, ActorLogging, Props, Status, Timers } +import akka.pattern.pipe +import akka.persistence.jdbc.config.DurableStateSequenceRetrievalConfig +import akka.stream.Materializer +import akka.stream.scaladsl.Sink + +import scala.collection.immutable.NumericRange +import scala.concurrent.duration.FiniteDuration + +object DurableStateSequenceActor { + def props[A](stateStore: JdbcDurableStateStore[A], config: DurableStateSequenceRetrievalConfig)( + implicit materializer: Materializer): Props = Props(new DurableStateSequenceActor(stateStore, config)) + + private case object QueryOrderingIds + private case class NewOrderingIds(originalOffset: Long, elements: Seq[OrderingId]) + + private case class ScheduleAssumeMaxOrderingId(max: OrderingId) + private case class AssumeMaxOrderingId(max: OrderingId) + + case object GetMaxOrderingId + case class MaxOrderingId(maxOrdering: OrderingId) + + private case object QueryOrderingIdsTimerKey + private case object AssumeMaxOrderingIdTimerKey + + private type OrderingId = Long + + /** + * Efficient representation of missing elements using NumericRanges. + * It can be seen as a collection of OrderingIds + */ + private case class MissingElements(elements: Seq[NumericRange[OrderingId]]) { + def addRange(from: OrderingId, until: OrderingId): MissingElements = { + val newRange = from.until(until) + MissingElements(elements :+ newRange) + } + def contains(id: OrderingId): Boolean = elements.exists(_.containsTyped(id)) + def isEmpty: Boolean = elements.forall(_.isEmpty) + } + private object MissingElements { + def empty: MissingElements = MissingElements(Vector.empty) + } +} + +/** + * To support the changesByTag query, this actor keeps track of which rows are visible in the database. + * This is required to guarantee the changesByTag does not skip any rows in case rows with a higher (ordering) id are + * visible in the database before rows with a lower (ordering) id. + */ +class DurableStateSequenceActor[A](stateStore: JdbcDurableStateStore[A], config: DurableStateSequenceRetrievalConfig)( + implicit materializer: Materializer) + extends Actor + with ActorLogging + with Timers { + import DurableStateSequenceActor._ + import context.dispatcher + import config.{ batchSize, maxBackoffQueryDelay, maxTries, queryDelay } + + override def receive: Receive = receive(0L, Map.empty, 0) + + override def preStart(): Unit = { + self ! QueryOrderingIds + stateStore.maxStateStoreOffset().mapTo[Long].onComplete { + case scala.util.Success(maxInDatabase) => + self ! ScheduleAssumeMaxOrderingId(maxInDatabase) + case scala.util.Failure(t) => + log.info("Failed to recover fast, using state-by-state recovery instead. Cause: {}", t) + } + } + + /** + * @param currentMaxOrdering The highest ordering value for which it is known that no missing elements exist + * @param missingByCounter A map with missing orderingIds. The key of the map is the count at which the missing elements + * can be assumed to be "skipped ids" (they are no longer assumed missing). + * @param moduloCounter A counter which is incremented every time a new query have been executed, modulo `maxTries` + * @param previousDelay The last used delay (may change in case failures occur) + */ + final def receive( + currentMaxOrdering: OrderingId, + missingByCounter: Map[Int, MissingElements], + moduloCounter: Int, + previousDelay: FiniteDuration = queryDelay): Receive = { + case ScheduleAssumeMaxOrderingId(max) => + // All elements smaller than max can be assumed missing after this delay + val delay = queryDelay * maxTries + timers.startSingleTimer(key = AssumeMaxOrderingIdTimerKey, AssumeMaxOrderingId(max), delay) + + case AssumeMaxOrderingId(max) => + if (currentMaxOrdering < max) { + context.become(receive(max, missingByCounter, moduloCounter, previousDelay)) + } + + case GetMaxOrderingId => + sender() ! MaxOrderingId(currentMaxOrdering) + + case QueryOrderingIds => + stateStore + .stateStoreSequence(currentMaxOrdering, batchSize) + .runWith(Sink.seq) + .map(result => NewOrderingIds(currentMaxOrdering, result)) + .pipeTo(self) + + case NewOrderingIds(originalOffset, _) if originalOffset < currentMaxOrdering => + // search was done using an offset that became obsolete in the meantime + // therefore we start a new query + self ! QueryOrderingIds + + case NewOrderingIds(_, elements) => + findGaps(elements, currentMaxOrdering, missingByCounter, moduloCounter) + + case Status.Failure(t) => + val newDelay = maxBackoffQueryDelay.min(previousDelay * 2) + if (newDelay == maxBackoffQueryDelay) { + log.warning("Failed to query max ordering id because of {}, retrying in {}", t, newDelay) + } + scheduleQuery(newDelay) + context.become(receive(currentMaxOrdering, missingByCounter, moduloCounter, newDelay)) + } + + /** + * This method that implements the "find gaps" algo. It's the meat and main purpose of this actor. + */ + final def findGaps( + elements: Seq[OrderingId], + currentMaxOrdering: OrderingId, + missingByCounter: Map[Int, MissingElements], + moduloCounter: Int): Unit = { + // list of elements that will be considered as genuine gaps. + // `givenUp` is either empty or is was filled on a previous iteration + val givenUp = missingByCounter.getOrElse(moduloCounter, MissingElements.empty) + + val (nextMax, _, missingElems) = + // using the ordering elements that were fetched, we verify if there are any gaps + elements.foldLeft[(OrderingId, OrderingId, MissingElements)]( + (currentMaxOrdering, currentMaxOrdering, MissingElements.empty)) { + case ((currentMax, previousElement, missing), currentElement) => + // we must decide if we move the cursor forward + val newMax = + if ((currentMax + 1).until(currentElement).forall(givenUp.contains)) { + // we move the cursor forward when: + // 1) they have been detected as missing on previous iteration, it's time now to give up + // 2) current + 1 == currentElement (meaning no gap). Note that `forall` on an empty range always returns true + currentElement + } else currentMax + + // we accumulate in newMissing the gaps we detect on each iteration + val newMissing = + if (previousElement + 1 == currentElement || newMax == currentElement) missing + else missing.addRange(previousElement + 1, currentElement) + + (newMax, currentElement, newMissing) + } + + val newMissingByCounter = missingByCounter + (moduloCounter -> missingElems) + + // did we detect gaps in the current batch? + val noGapsFound = missingElems.isEmpty + + // full batch means that we retrieved as much elements as the batchSize + // that happens when we are not yet at the end of the stream + val isFullBatch = elements.size == batchSize + + if (noGapsFound && isFullBatch) { + // Many elements have been retrieved but none are missing + // We can query again immediately, as this allows the actor to rapidly retrieve the real max ordering + self ! QueryOrderingIds + context.become(receive(nextMax, newMissingByCounter, moduloCounter)) + } else { + // either we detected gaps or we reached the end of stream (batch not full) + // in this case we want to keep querying but not immediately + scheduleQuery(queryDelay) + context.become(receive(nextMax, newMissingByCounter, (moduloCounter + 1) % maxTries)) + } + } + + def scheduleQuery(delay: FiniteDuration): Unit = { + timers.startSingleTimer(key = QueryOrderingIdsTimerKey, QueryOrderingIds, delay) + } +} diff --git a/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala new file mode 100644 index 000000000..c0ed8c26b --- /dev/null +++ b/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala @@ -0,0 +1,215 @@ +/* + * Copyright (C) 2014 - 2019 Dennis Vriend + * Copyright (C) 2019 - 2021 Lightbend Inc. + */ + +package akka.persistence.jdbc.state.scaladsl + +import scala.concurrent.{ ExecutionContext, Future } +import scala.concurrent.duration._ +import scala.util.Try +import slick.jdbc.{ JdbcBackend, JdbcProfile } +import akka.{ Done, NotUsed } +import akka.actor.ExtendedActorSystem +import akka.pattern.ask +import akka.persistence.state.scaladsl.{ DurableStateUpdateStore, GetObjectResult } +import akka.persistence.jdbc.AkkaSerialization +import akka.persistence.jdbc.state.DurableStateQueries +import akka.persistence.jdbc.config.DurableStateTableConfiguration +import akka.persistence.jdbc.state.{ DurableStateTables, OffsetSyntax } +import akka.persistence.query.{ DurableStateChange, Offset } +import akka.persistence.query.scaladsl.DurableStateStoreQuery +import akka.persistence.jdbc.journal.dao.FlowControl +import akka.serialization.Serialization +import akka.stream.scaladsl.{ Sink, Source } +import akka.stream.{ Materializer, SystemMaterializer } +import akka.util.Timeout +import DurableStateSequenceActor._ +import OffsetSyntax._ + +class JdbcDurableStateStore[A]( + db: JdbcBackend#Database, + profile: JdbcProfile, + durableStateConfig: DurableStateTableConfiguration, + serialization: Serialization)(implicit val system: ExtendedActorSystem) + extends DurableStateUpdateStore[A] + with DurableStateStoreQuery[A] { + import FlowControl._ + import profile.api._ + + implicit val ec: ExecutionContext = system.dispatcher + implicit val mat: Materializer = SystemMaterializer(system).materializer + + lazy val queries = new DurableStateQueries(profile, durableStateConfig) + + // Started lazily to prevent the actor for querying the db if no changesByTag queries are used + private[jdbc] lazy val stateSequenceActor = system.systemActorOf( + DurableStateSequenceActor.props(this, durableStateConfig.stateSequenceConfig), + s"akka-persistence-jdbc-durable-state-sequence-actor") + + def getObject(persistenceId: String): Future[GetObjectResult[A]] = { + db.run(queries.selectFromDbByPersistenceId(persistenceId).result).map { rows => + rows.headOption match { + case Some(row) => + GetObjectResult( + AkkaSerialization.fromDurableStateRow(serialization)(row).toOption.asInstanceOf[Option[A]], + row.revision) + + case None => + GetObjectResult(None, 0) + } + } + } + + def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = { + require(revision > 0) + val row = + AkkaSerialization.serialize(serialization, value).map { serialized => + DurableStateTables.DurableStateRow( + 0, // insert 0 for autoinc columns + persistenceId, + revision, + serialized.payload, + Option(tag).filter(_.trim.nonEmpty), + serialized.serId, + Option(serialized.serManifest).filter(_.trim.nonEmpty), + System.currentTimeMillis) + } + + Future + .fromTry(row) + .flatMap { r => + val action = if (revision == 1) insertDurableState(r) else updateDurableState(r) + db.run(action) + } + .map { rowsAffected => + if (rowsAffected == 0) + throw new IllegalStateException( + s"Incorrect revision number [$revision] provided: It has to be 1 more than the value existing in the database for persistenceId [$persistenceId]") + else Done + } + } + + def deleteObject(persistenceId: String): Future[Done] = + db.run(queries.deleteFromDb(persistenceId).map(_ => Done)) + + def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed] = { + Source + .futureSource(maxStateStoreOffset().map { maxOrderingInDb => + changesByTag(tag, offset.value, terminateAfterOffset = Some(maxOrderingInDb)) + }) + .mapMaterializedValue(_ => NotUsed) + } + + def changes(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed] = + changesByTag(tag, offset.value, terminateAfterOffset = None) + + private def currentChangesByTag( + tag: String, + from: Long, + batchSize: Long, + queryUntil: MaxOrderingId): Source[DurableStateChange[A], NotUsed] = { + if (queryUntil.maxOrdering < from) Source.empty + else changesByTagFromDb(tag, from, queryUntil.maxOrdering, batchSize).mapAsync(1)(Future.fromTry) + } + + private def changesByTagFromDb( + tag: String, + offset: Long, + maxOffset: Long, + batchSize: Long): Source[Try[DurableStateChange[A]], NotUsed] = { + Source + .fromPublisher(db.stream(queries.changesByTag((tag, offset, maxOffset, batchSize)).result)) + .map(toDurableStateChange) + } + + private[jdbc] def changesByTag( + tag: String, + offset: Long, + terminateAfterOffset: Option[Long]): Source[DurableStateChange[A], NotUsed] = { + + val batchSize = durableStateConfig.batchSize + val startingOffsets = List.empty[Long] + implicit val askTimeout: Timeout = Timeout(durableStateConfig.stateSequenceConfig.askTimeout) + + Source + .unfoldAsync[(Long, FlowControl, List[Long]), Seq[DurableStateChange[A]]]((offset, Continue, startingOffsets)) { + case (from, control, s) => + def retrieveNextBatch() = { + for { + queryUntil <- stateSequenceActor.ask(GetMaxOrderingId).mapTo[MaxOrderingId] + xs <- currentChangesByTag(tag, from, batchSize, queryUntil).runWith(Sink.seq) + } yield { + + val hasMoreEvents = xs.size == batchSize + val nextControl: FlowControl = + terminateAfterOffset match { + // we may stop if target is behind queryUntil and we don't have more events to fetch + case Some(target) if !hasMoreEvents && target <= queryUntil.maxOrdering => Stop + + // We may also stop if we have found an event with an offset >= target + case Some(target) if xs.exists(_.offset.value >= target) => Stop + + // otherwise, disregarding if Some or None, we must decide how to continue + case _ => + if (hasMoreEvents) Continue + else ContinueDelayed + } + val nextStartingOffset = if (xs.isEmpty) { + math.max(from.value, queryUntil.maxOrdering) + } else { + // Continue querying from the largest offset + xs.map(_.offset.value).max + } + Some(((nextStartingOffset, nextControl, s :+ nextStartingOffset), xs)) + } + } + + control match { + case Stop => Future.successful(None) + case Continue => retrieveNextBatch() + case ContinueDelayed => + akka.pattern.after(durableStateConfig.refreshInterval, system.scheduler)(retrieveNextBatch()) + } + } + .mapConcat(identity) + } + + private[jdbc] def maxStateStoreOffset(): Future[Long] = + db.run(queries.maxOffsetQuery.result) + + private[jdbc] def stateStoreSequence(offset: Long, limit: Long): Source[Long, NotUsed] = + Source.fromPublisher(db.stream(queries.stateStoreSequenceQuery((offset, limit)).result)) + + private def toDurableStateChange(row: DurableStateTables.DurableStateRow): Try[DurableStateChange[A]] = { + AkkaSerialization + .fromDurableStateRow(serialization)(row) + .map(payload => + new DurableStateChange( + row.persistenceId, + row.revision, + payload.asInstanceOf[A], + Offset.sequence(row.globalOffset), + row.stateTimestamp)) + } + + private def updateDurableState(row: DurableStateTables.DurableStateRow) = { + import queries._ + + for { + s <- getSequenceNextValueExpr() + u <- updateDbWithDurableState(row, s.head) + } yield u + } + + private def insertDurableState(row: DurableStateTables.DurableStateRow) = { + import queries._ + + for { + s <- getSequenceNextValueExpr() + u <- insertDbWithDurableState(row, s.head) + } yield u + } + + def deleteAllFromDb() = db.run(queries.deleteAllFromDb()) +} diff --git a/core/src/test/resources/h2-application.conf b/core/src/test/resources/h2-application.conf index 9a9b89ada..1d01979f7 100644 --- a/core/src/test/resources/h2-application.conf +++ b/core/src/test/resources/h2-application.conf @@ -44,6 +44,11 @@ jdbc-read-journal { slick = ${slick} } +# the akka-persistence-jdbc provider in use for durable state store +jdbc-durable-state-store { + slick = ${slick} +} + slick { profile = "slick.jdbc.H2Profile$" db { diff --git a/core/src/test/resources/postgres-application.conf b/core/src/test/resources/postgres-application.conf index 13c5225f9..789a94cb0 100644 --- a/core/src/test/resources/postgres-application.conf +++ b/core/src/test/resources/postgres-application.conf @@ -44,6 +44,11 @@ jdbc-read-journal { slick = ${slick} } +# the akka-persistence-jdbc provider in use for durable state store +jdbc-durable-state-store { + slick = ${slick} +} + slick { profile = "slick.jdbc.PostgresProfile$" db { diff --git a/core/src/test/resources/postgres-shared-db-application.conf b/core/src/test/resources/postgres-shared-db-application.conf index f58628cc7..f9821ff43 100644 --- a/core/src/test/resources/postgres-shared-db-application.conf +++ b/core/src/test/resources/postgres-shared-db-application.conf @@ -61,3 +61,8 @@ jdbc-snapshot-store { jdbc-read-journal { use-shared-db = "slick" } + +# the akka-persistence-jdbc provider in use for durable state store +jdbc-durable-state-store { + use-shared-db = "slick" +} diff --git a/core/src/test/scala/akka/persistence/jdbc/state/Payloads.scala b/core/src/test/scala/akka/persistence/jdbc/state/Payloads.scala new file mode 100644 index 000000000..e36026f56 --- /dev/null +++ b/core/src/test/scala/akka/persistence/jdbc/state/Payloads.scala @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2014 - 2019 Dennis Vriend + * Copyright (C) 2019 - 2021 Lightbend Inc. + */ + +package akka.persistence.jdbc.state + +import akka.serialization._ + +final case class MyPayload(data: String) + +class MyPayloadSerializer extends Serializer { + val MyPayloadClass = classOf[MyPayload] + + def identifier: Int = 77123 + def includeManifest: Boolean = true + + def toBinary(o: AnyRef): Array[Byte] = o match { + case MyPayload(data) => s"${data}".getBytes("UTF-8") + case _ => throw new Exception("Unknown object for serialization") + } + + def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest match { + case Some(MyPayloadClass) => MyPayload(s"${new String(bytes, "UTF-8")}") + case Some(c) => throw new Exception(s"unexpected manifest ${c}") + case None => throw new Exception("no manifest") + } +} diff --git a/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/DataGenerationHelper.scala b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/DataGenerationHelper.scala new file mode 100644 index 000000000..632f4ecd0 --- /dev/null +++ b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/DataGenerationHelper.scala @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2014 - 2019 Dennis Vriend + * Copyright (C) 2019 - 2021 Lightbend Inc. + */ + +package akka.persistence.jdbc.state.scaladsl + +import org.scalatest.concurrent.ScalaFutures +import scala.concurrent.{ ExecutionContext, Future } + +trait DataGenerationHelper extends ScalaFutures { + + // upsert multiple records for 1 persistence id + def upsertManyFor( + store: JdbcDurableStateStore[String], + persistenceId: String, + tag: String, + startIndex: Int, + n: Int) = { + (startIndex to startIndex + n - 1).map { c => + store.upsertObject(persistenceId, c, s"$c valid string", tag).futureValue + } + } + + private def times(n: Int, ls: List[String]) = ls.flatMap { List.fill(n)(_) } + + // upsert multiple records for a random shuffle of a list of persistence ids + def upsertRandomlyShuffledPersistenceIds( + store: JdbcDurableStateStore[String], + persistenceIds: List[String], + tag: String, + replicationFactor: Int) = { + val allPersistenceIds = scala.util.Random.shuffle(times(replicationFactor, persistenceIds)) + val m = collection.mutable.Map.empty[String, Long] + allPersistenceIds.map { p => + m.get(p) + .fold { + val _ = store.upsertObject(p, 1, s"1 valid string", tag).futureValue + m += ((p, 1)) + } { seq => + { + val _ = store.upsertObject(p, seq + 1, s"${seq + 1} valid string", tag).futureValue + m += ((p, seq + 1)) + } + } + } + } + + def upsertParallel(store: JdbcDurableStateStore[String], pids: Set[String], tag: String, noOfItems: Int)( + implicit ec: ExecutionContext) = { + + for { + _ <- Future.unit + f1 = Future(upsertManyFor(store, pids.head, tag, 1, noOfItems)) + f2 = Future(upsertManyFor(store, pids.tail.head, tag, 1, noOfItems)) + f3 = Future(upsertManyFor(store, pids.last, tag, 1, noOfItems)) + _ <- f1 + _ <- f2 + _ <- f3 + } yield (()) + } +} diff --git a/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActorTest.scala b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActorTest.scala new file mode 100644 index 000000000..6a79df0f9 --- /dev/null +++ b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActorTest.scala @@ -0,0 +1,188 @@ +/* + * Copyright (C) 2014 - 2019 Dennis Vriend + * Copyright (C) 2019 - 2021 Lightbend Inc. + */ + +package akka.persistence.jdbc.state.scaladsl + +import scala.concurrent.Future +import scala.concurrent.duration._ +import com.typesafe.config.{ Config, ConfigFactory } +import akka.actor.{ ActorRef, ActorSystem, ExtendedActorSystem } +import akka.pattern.ask +import akka.persistence.jdbc.SharedActorSystemTestSpec +import akka.persistence.jdbc.state.scaladsl.DurableStateSequenceActor.{ GetMaxOrderingId, MaxOrderingId } +import akka.persistence.jdbc.testkit.internal.{ H2, SchemaType } +import akka.testkit.TestProbe +import akka.util.Timeout +import org.scalatest.concurrent.Eventually + +abstract class DurableStateSequenceActorTest(config: Config, schemaType: SchemaType) + extends StateSpecBase(config, schemaType) + with DataGenerationHelper + with Eventually { + + val durableStateSequenceActorConfig = durableStateConfig.stateSequenceConfig + + implicit val askTimeout = 50.millis + implicit val timeout: Timeout = Timeout(1.minute) + + "A DurableStateSequenceActor" must { + "recover normally" in { + withActorSystem { implicit system => + val store = + new JdbcDurableStateStore[String](db, schemaTypeToProfile(schemaType), durableStateConfig, serialization) + upsertManyFor(store, "p1", "t1", 1, 400).size shouldBe 400 + + withDurableStateSequenceActor(store, maxTries = 100) { actor => + eventually { + actor.ask(GetMaxOrderingId).mapTo[MaxOrderingId].futureValue shouldBe MaxOrderingId(400) + } + } + } + } + } + + /** + * @param maxTries The number of tries before events are assumed missing + * (since the actor queries every second by default, + * this is effectively the number of seconds after which events are assumed missing) + */ + def withDurableStateSequenceActor(store: JdbcDurableStateStore[String], maxTries: Int)(f: ActorRef => Unit)( + implicit system: ActorSystem): Unit = { + val actor = + system.actorOf(DurableStateSequenceActor.props(store, durableStateSequenceActorConfig.copy(maxTries = maxTries))) + try f(actor) + finally system.stop(actor) + } +} + +class MockDurableStateSequenceActorTest extends SharedActorSystemTestSpec { + def fetchMaxOrderingId(durableStateSequenceActor: ActorRef): Future[Long] = { + durableStateSequenceActor.ask(GetMaxOrderingId)(20.millis).mapTo[MaxOrderingId].map(_.maxOrdering) + } + + it should "re-query with delay only when events are missing." in { + val batchSize = 100 + val maxTries = 5 + val queryDelay = 200.millis + + val almostQueryDelay = queryDelay - 50.millis + val almostImmediately = 50.millis + withTestProbeDurableStateSequenceActor(batchSize, maxTries, queryDelay) { (daoProbe, _) => + daoProbe.expectMsg(almostImmediately, TestProbeDurableStateStoreQuery.OffsetSequence(0, batchSize)) + val firstBatch = (1L to 40L) ++ (51L to 110L) + daoProbe.reply(firstBatch) + withClue(s"when events are missing, the actor should wait for $queryDelay before querying again") { + daoProbe.expectNoMessage(almostQueryDelay) + daoProbe.expectMsg(almostQueryDelay, TestProbeDurableStateStoreQuery.OffsetSequence(40, batchSize)) + } + // number 41 is still missing after this batch + val secondBatch = 42L to 110L + daoProbe.reply(secondBatch) + withClue(s"when events are missing, the actor should wait for $queryDelay before querying again") { + daoProbe.expectNoMessage(almostQueryDelay) + daoProbe.expectMsg(almostQueryDelay, TestProbeDurableStateStoreQuery.OffsetSequence(40, batchSize)) + } + val thirdBatch = 41L to 110L + daoProbe.reply(thirdBatch) + withClue( + s"when no more events are missing, but less that batchSize elemens have been received, " + + s"the actor should wait for $queryDelay before querying again") { + daoProbe.expectNoMessage(almostQueryDelay) + daoProbe.expectMsg(almostQueryDelay, TestProbeDurableStateStoreQuery.OffsetSequence(110, batchSize)) + } + + val fourthBatch = 111L to 210L + daoProbe.reply(fourthBatch) + withClue( + "When no more events are missing and the number of events received is equal to batchSize, " + + "the actor should query again immediately") { + daoProbe.expectMsg(almostImmediately, TestProbeDurableStateStoreQuery.OffsetSequence(210, batchSize)) + } + + // Reply to prevent a dead letter warning on the timeout + daoProbe.reply(Seq.empty) + daoProbe.expectNoMessage(almostImmediately) + } + } + + it should "Assume an element missing after the configured amount of maxTries" in { + val batchSize = 100 + val maxTries = 5 + val queryDelay = 150.millis + + val slightlyMoreThanQueryDelay = queryDelay + 50.millis + val almostImmediately = 20.millis + + val allIds = (1L to 40L) ++ (43L to 200L) + + withTestProbeDurableStateSequenceActor(batchSize, maxTries, queryDelay) { (daoProbe, actor) => + daoProbe.expectMsg(almostImmediately, TestProbeDurableStateStoreQuery.OffsetSequence(0, batchSize)) + daoProbe.reply(allIds.take(100)) + + val idsLargerThan40 = allIds.dropWhile(_ <= 40) + val retryResponse = idsLargerThan40.take(100) + for (i <- 1 to maxTries) withClue(s"should retry $maxTries times (attempt $i)") { + daoProbe.expectMsg(slightlyMoreThanQueryDelay, TestProbeDurableStateStoreQuery.OffsetSequence(40, batchSize)) + daoProbe.reply(retryResponse) + } + + // sanity check + retryResponse.last shouldBe 142 + withClue( + "The elements 41 and 42 should be assumed missing, " + + "the actor should query again immediately since a full batch has been received") { + daoProbe.expectMsg(almostImmediately, TestProbeDurableStateStoreQuery.OffsetSequence(142, batchSize)) + fetchMaxOrderingId(actor).futureValue shouldBe 142 + } + + // Reply to prevent a dead letter warning on the timeout + daoProbe.reply(Seq.empty) + daoProbe.expectNoMessage(almostImmediately) + } + } + + import akka.persistence.jdbc.config.DurableStateTableConfiguration + def withTestProbeDurableStateSequenceActor(batchSize: Int, maxTries: Int, queryDelay: FiniteDuration)( + f: (TestProbe, ActorRef) => Unit)(implicit system: ActorSystem): Unit = { + val testProbe = TestProbe() + + val customConfig = ConfigFactory.parseString(s""" + jdbc-durable-state-store { + schemaType = H2 + batchSize = ${batchSize} + refreshInterval = 500.milliseconds + durable-state-sequence-retrieval { + query-delay = ${queryDelay} + max-tries = ${maxTries} + max-backoff-query-delay = 4.seconds + ask-timeout = 100.millis + batch-size = ${batchSize} + } + } + """) + + lazy val cfg = customConfig + .getConfig("jdbc-durable-state-store") + .withFallback(system.settings.config.getConfig("jdbc-durable-state-store")) + .withFallback(ConfigFactory.load("h2-application.conf").getConfig("jdbc-durable-state-store")) + + val stateTableConfig = new DurableStateTableConfiguration(cfg) + + val mockStore = + new TestProbeDurableStateStoreQuery(testProbe, db, slick.jdbc.H2Profile, stateTableConfig, serialization)( + system.asInstanceOf[ExtendedActorSystem]) + val actor = system.actorOf( + DurableStateSequenceActor + .props(mockStore.asInstanceOf[JdbcDurableStateStore[String]], stateTableConfig.stateSequenceConfig)) + try f(testProbe, actor) + finally system.stop(actor) + } +} + +class H2DurableStateSequenceActorTest + extends DurableStateSequenceActorTest(ConfigFactory.load("h2-application.conf"), H2) { + implicit lazy val system: ActorSystem = + ActorSystem("test", config.withFallback(customSerializers)) +} diff --git a/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala new file mode 100644 index 000000000..211a0b210 --- /dev/null +++ b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala @@ -0,0 +1,393 @@ +/* + * Copyright (C) 2014 - 2019 Dennis Vriend + * Copyright (C) 2019 - 2021 Lightbend Inc. + */ + +package akka.persistence.jdbc.state.scaladsl + +import com.typesafe.config.{ Config, ConfigFactory } + +import akka.actor._ +import akka.persistence.jdbc.state.{ MyPayload, OffsetSyntax } +import OffsetSyntax._ +import akka.persistence.jdbc.testkit.internal.{ H2, Postgres, SchemaType } +import akka.persistence.query.{ NoOffset, Offset, Sequence } +import akka.stream.scaladsl.Sink + +abstract class JdbcDurableStateSpec(config: Config, schemaType: SchemaType) extends StateSpecBase(config, schemaType) { + + "A durable state store" must withActorSystem { implicit system => + val stateStoreString = + new JdbcDurableStateStore[String](db, schemaTypeToProfile(schemaType), durableStateConfig, serialization) + + "not load a state given an invalid persistenceId" in { + whenReady { + stateStoreString.getObject("InvalidPersistenceId") + } { v => + v.value shouldBe None + } + } + "add a valid state successfully" in { + whenReady { + stateStoreString.upsertObject("p123", 1, "a valid string", "t123") + } { v => + v shouldBe akka.Done + } + } + "support composite upsert-fetch-repeat loop" in { + whenReady { + for { + + n <- stateStoreString.upsertObject("p234", 1, "a valid string", "t123") + _ = n shouldBe akka.Done + g <- stateStoreString.getObject("p234") + _ = g.value shouldBe Some("a valid string") + u <- stateStoreString.upsertObject("p234", 2, "updated valid string", "t123") + _ = u shouldBe akka.Done + h <- stateStoreString.getObject("p234") + + } yield h + } { v => + v.value shouldBe Some("updated valid string") + } + } + "fail inserting an already existing sequence number" in { + whenReady { + (for { + + n <- stateStoreString.upsertObject("p345", 1, "a valid string", "t123") + _ = n shouldBe akka.Done + g <- stateStoreString.getObject("p345") + _ = g.value shouldBe Some("a valid string") + u <- stateStoreString.upsertObject("p345", 1, "updated valid string", "t123") + + } yield u).failed + } { e => + schemaType match { + case H2 => + e shouldBe an[org.h2.jdbc.JdbcSQLIntegrityConstraintViolationException] + case Postgres => + e shouldBe an[org.postgresql.util.PSQLException] + case _ => ??? + } + } + } + "fail inserting incorrect sequence number with 0 rows affected" in { + whenReady { + stateStoreString.upsertObject("p234", 1, "1 valid string", "t1").futureValue + stateStoreString.upsertObject("p234", 2, "2 valid string", "t1").futureValue + stateStoreString.upsertObject("p234", 3, "3 valid string", "t1").futureValue + stateStoreString.upsertObject("p234", 5, "5 valid string", "t1").failed + } { e => + e shouldBe an[IllegalStateException] + // offset should not change + stateStoreString.maxStateStoreOffset().futureValue shouldBe 3 + // sequence number should not change + stateStoreString.getObject("p234").futureValue.revision shouldBe 3 + } + } + "delete an existing state" in { + whenReady { + stateStoreString.deleteObject("p123") + } { v => + v shouldBe akka.Done + whenReady { + stateStoreString.getObject("p123") + } { v => + v.value shouldBe None + } + } + } + } + + "A durable state store with payload that needs custom serializer" must withActorSystem { implicit system => + val stateStorePayload = + new JdbcDurableStateStore[MyPayload](db, schemaTypeToProfile(schemaType), durableStateConfig, serialization) + + "not load a state given an invalid persistenceId" in { + whenReady { + stateStorePayload.getObject("InvalidPersistenceId") + } { v => + v.value shouldBe None + } + } + "add a valid state successfully" in { + whenReady { + stateStorePayload.upsertObject("p123", 1, MyPayload("a valid string"), "t123") + } { v => + v shouldBe akka.Done + } + } + "support composite upsert-fetch-repeat loop" in { + whenReady { + for { + + n <- stateStorePayload.upsertObject("p234", 1, MyPayload("a valid string"), "t123") + _ = n shouldBe akka.Done + g <- stateStorePayload.getObject("p234") + _ = g.value shouldBe Some(MyPayload("a valid string")) + u <- stateStorePayload.upsertObject("p234", 2, MyPayload("updated valid string"), "t123") + _ = u shouldBe akka.Done + h <- stateStorePayload.getObject("p234") + + } yield h + } { v => + v.value shouldBe Some(MyPayload("updated valid string")) + } + } + "delete an existing state" in { + whenReady { + stateStorePayload.deleteObject("p234") + } { v => + v shouldBe akka.Done + whenReady { + stateStorePayload.getObject("p234") + } { v => + v.value shouldBe None + } + } + } + } + + "A JDBC durable state store" must { + + "find all states by tag either from the beginning or from a specific offset" in withActorSystem { implicit system => + val stateStoreString = + new JdbcDurableStateStore[String](db, schemaTypeToProfile(schemaType), durableStateConfig, serialization) + + import stateStoreString._ + + // fetch from beginning + upsertManyFor(stateStoreString, "p1", "t1", 1, 4) + val chgs = currentChanges("t1", NoOffset).runWith(Sink.seq).futureValue + chgs.size shouldBe 1 + chgs.map(_.offset.value).max shouldBe 4 + + // upsert more and fetch from after the last offset + upsertManyFor(stateStoreString, "p1", "t1", 5, 7) + val moreChgs = currentChanges("t1", chgs.head.offset).runWith(Sink.seq).futureValue + moreChgs.size shouldBe 1 + moreChgs.map(_.offset.value).max shouldBe 11 + + // upsert same tag, different persistence id and fetch from after the last offset + upsertManyFor(stateStoreString, "p2", "t1", 1, 3) + val otherChgs = currentChanges("t1", moreChgs.head.offset).runWith(Sink.seq).futureValue + otherChgs.size shouldBe 1 + otherChgs.map(_.offset.value).max shouldBe 14 + + // again fetch from the beginning + val cs = currentChanges("t1", NoOffset).runWith(Sink.seq).futureValue + cs.size shouldBe 2 + cs.map(_.offset.value).max shouldBe 14 + } + + "find the max offset after a series of upserts with multiple persistence ids" in withActorSystem { + implicit system => + val stateStoreString = + new JdbcDurableStateStore[String](db, schemaTypeToProfile(schemaType), durableStateConfig, serialization) + + import stateStoreString._ + upsertRandomlyShuffledPersistenceIds(stateStoreString, List("p1", "p2", "p3"), "t1", 3) + val chgs = currentChanges("t1", NoOffset).runWith(Sink.seq).futureValue + chgs.size shouldBe 3 + chgs.map(_.offset.value).max shouldBe 9 + } + + "find all states by tags with offsets sorted and proper max and min offsets when starting offset is specified" in withActorSystem { + implicit system => + val stateStoreString = + new JdbcDurableStateStore[String](db, schemaTypeToProfile(schemaType), durableStateConfig, serialization) + + import stateStoreString._ + upsertRandomlyShuffledPersistenceIds(stateStoreString, List("p1", "p2", "p3"), "t1", 3) + val chgs = stateStoreString.currentChanges("t1", Offset.sequence(7)).runWith(Sink.seq).futureValue + chgs.map(_.offset.value) shouldBe sorted + chgs.map(_.offset.value).max shouldBe 9 + chgs.map(_.offset.value).min should be > 7L + } + + "find all states by tags returning a live source with no offset specified" in withActorSystem { implicit system => + val stateStoreString = + new JdbcDurableStateStore[String](db, schemaTypeToProfile(schemaType), durableStateConfig, serialization) + + import stateStoreString._ + upsertRandomlyShuffledPersistenceIds(stateStoreString, List("p1", "p2", "p3"), "t1", 3) + val source = stateStoreString.changes("t1", NoOffset) + val m = collection.mutable.ListBuffer.empty[(String, Long)] + + // trick to complete the future + val f = source + .takeWhile { e => + m += ((e.persistenceId, e.offset.value)) + e.offset.value < 12 + } + .runWith(Sink.seq) + + // more data after some delay + Thread.sleep(100) + upsertObject("p3", 4, "4 valid string", "t2").futureValue + upsertObject("p2", 4, "4 valid string", "t1").futureValue + upsertObject("p1", 4, "4 valid string", "t1").futureValue + + whenReady(f) { _ => + m.size shouldBe 2 + m.toList.map(_._2) shouldBe sorted + m.toList.map(_._2).max shouldBe 12 + } + } + + "find all states by tags returning a live source with a starting offset specified" in withActorSystem { + implicit system => + val stateStoreString = + new JdbcDurableStateStore[String](db, schemaTypeToProfile(schemaType), durableStateConfig, serialization) + + import stateStoreString._ + upsertRandomlyShuffledPersistenceIds(stateStoreString, List("p1", "p2", "p3"), "t1", 3) + val source = stateStoreString.changes("t1", Sequence(4)) + val m = collection.mutable.ListBuffer.empty[(String, Long)] + + // trick to complete the future + val f = source + .takeWhile { e => + m += ((e.persistenceId, e.offset.value)) + e.offset.value < 12 + } + .runWith(Sink.seq) + + // more data after some delay + Thread.sleep(100) + upsertManyFor(stateStoreString, "p3", "t1", 4, 3) + + whenReady(f) { _ => + m.map(_._2) shouldBe sorted + m.map(_._2).min should be > 4L + m.map(_._2).max shouldBe 12 + } + } + } + + "A JDBC durable state store in the face of parallel upserts" must { + + "fetch proper values of offsets with currentChanges()" in withActorSystem { implicit system => + val stateStoreString = + new JdbcDurableStateStore[String](db, schemaTypeToProfile(schemaType), durableStateConfig, serialization) + + import stateStoreString._ + + upsertParallel(stateStoreString, Set("p1", "p2", "p3"), "t1", 1000)(e).futureValue + whenReady { + currentChanges("t1", NoOffset).runWith(Sink.seq) + } { chgs => + chgs.map(_.offset.value) shouldBe sorted + chgs.map(_.offset.value).max shouldBe 3000 + } + + whenReady { + currentChanges("t1", Sequence(2000)).runWith(Sink.seq) + } { chgs => + chgs.map(_.offset.value) shouldBe sorted + chgs.map(_.offset.value).max shouldBe 3000 + } + } + + "fetch proper values of offsets from beginning with changes() and phased upserts - case 1" in withActorSystem { + implicit system => + val stateStoreString = + new JdbcDurableStateStore[String](db, schemaTypeToProfile(schemaType), durableStateConfig, serialization) + + import stateStoreString._ + + upsertParallel(stateStoreString, Set("p1", "p2", "p3"), "t1", 5)(e).futureValue + val source = changes("t2", NoOffset) + val m = collection.mutable.ListBuffer.empty[(String, Long)] + // trick to complete the future + val f = source + .takeWhile { e => + m += ((e.persistenceId, e.offset.value)) + e.offset.value < 21 + } + .runWith(Sink.seq) + + // more data after some delay + Thread.sleep(1000) + upsertManyFor(stateStoreString, "p3", "t1", 6, 3) + Thread.sleep(1000) + upsertManyFor(stateStoreString, "p3", "t2", 9, 3) + + whenReady(f) { _ => + m.map(_._2) shouldBe sorted + m.map(_._2).min should be > 0L + m.map(_._2).max shouldBe 21 + } + } + + "fetch proper values of offsets from beginning for a larger dataset with changes() and phased upserts - case 2" in withActorSystem { + implicit system => + val stateStoreString = + new JdbcDurableStateStore[String](db, schemaTypeToProfile(schemaType), durableStateConfig, serialization) + + import stateStoreString._ + + upsertParallel(stateStoreString, Set("p1", "p2", "p3"), "t1", 1000)(e).futureValue + val source = changes("t2", NoOffset) + val m = collection.mutable.ListBuffer.empty[(String, Long)] + // trick to complete the future + val f = source + .takeWhile { e => + m += ((e.persistenceId, e.offset.value)) + e.offset.value < 3060 + } + .runWith(Sink.seq) + + // more data after some delay + Thread.sleep(1000) + upsertManyFor(stateStoreString, "p3", "t1", 1001, 30) + Thread.sleep(1000) + upsertManyFor(stateStoreString, "p3", "t2", 1031, 30) + + whenReady(f) { _ => + m.map(_._2) shouldBe sorted + m.map(_._2).min should be > 0L + m.map(_._2).max shouldBe 3060 + } + } + + "fetch proper values of offsets from beginning for a larger dataset with changes() and phased upserts - case 3" in withActorSystem { + implicit system => + val stateStoreString = + new JdbcDurableStateStore[String](db, schemaTypeToProfile(schemaType), durableStateConfig, serialization) + + import stateStoreString._ + + upsertParallel(stateStoreString, Set("p1", "p2", "p3"), "t1", 5)(e).futureValue + val source = changes("t1", NoOffset) + val m = collection.mutable.ListBuffer.empty[(String, Long)] + // trick to complete the future + val f = source + .takeWhile { e => + m += ((e.persistenceId, e.offset.value)) + m.size == 2 && m.map(_._1).toSet == Set("p1", "p2") + } + .runWith(Sink.seq) + + // more data after some delay + Thread.sleep(1000) + upsertManyFor(stateStoreString, "p3", "t1", 6, 2) + Thread.sleep(1000) + // tag for p3 now changes to t2 + // hence should not be included in the query + upsertManyFor(stateStoreString, "p3", "t2", 8, 2) + + whenReady(f) { _ => + m.map(_._2) shouldBe sorted + m.map(_._2).min should be > 0L + m.map(_._2).max should be > 10L + } + } + } +} + +class H2DurableStateSpec extends JdbcDurableStateSpec(ConfigFactory.load("h2-application.conf"), H2) { + implicit lazy val system: ActorSystem = + ActorSystem("test", config.withFallback(customSerializers)) +} diff --git a/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/StateSpecBase.scala b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/StateSpecBase.scala new file mode 100644 index 000000000..04d3be582 --- /dev/null +++ b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/StateSpecBase.scala @@ -0,0 +1,120 @@ +/* + * Copyright (C) 2014 - 2019 Dennis Vriend + * Copyright (C) 2019 - 2021 Lightbend Inc. + */ + +package akka.persistence.jdbc.state.scaladsl + +import com.typesafe.config.{ Config, ConfigFactory } +import scala.concurrent.duration._ +import scala.util.{ Failure, Success } +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpecLike +import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.time._ + +import akka.actor._ +import akka.persistence.jdbc.db.SlickDatabase +import akka.persistence.jdbc.config._ +import akka.persistence.jdbc.testkit.internal.{ H2, Postgres, SchemaType } +import akka.persistence.jdbc.util.DropCreate +import akka.serialization.SerializationExtension +import akka.util.Timeout + +abstract class StateSpecBase(val config: Config, schemaType: SchemaType) + extends AnyWordSpecLike + with BeforeAndAfterAll + with BeforeAndAfterEach + with Matchers + with ScalaFutures + with DropCreate + with DataGenerationHelper { + implicit def system: ActorSystem + + implicit lazy val e = system.dispatcher + + private[jdbc] def schemaTypeToProfile(s: SchemaType) = s match { + case H2 => slick.jdbc.H2Profile + case Postgres => slick.jdbc.PostgresProfile + case _ => ??? + } + + val customSerializers = ConfigFactory.parseString(""" + akka.actor { + serializers { + my-payload = "akka.persistence.jdbc.state.MyPayloadSerializer" + } + serialization-bindings { + "akka.persistence.jdbc.state.MyPayload" = my-payload + } + } + """) + + val customConfig = ConfigFactory.parseString(s""" + jdbc-durable-state-store { + batchSize = 200 + refreshInterval = 500.milliseconds + durable-state-sequence-retrieval { + query-delay = 100.milliseconds + } + } + """) + + lazy val cfg = customConfig + .getConfig("jdbc-durable-state-store") + .withFallback(system.settings.config.getConfig("jdbc-durable-state-store")) + .withFallback(config.getConfig("jdbc-durable-state-store")) + .withFallback(customSerializers.getConfig("akka.actor")) + + lazy val db = if (cfg.hasPath("slick.profile")) { + SlickDatabase.database(cfg, new SlickConfiguration(cfg.getConfig("slick")), "slick.db") + } else { + // needed for integration test where we use postgres-shared-db-application.conf + SlickDatabase.database( + config, + new SlickConfiguration(config.getConfig("akka-persistence-jdbc.shared-databases.slick")), + "akka-persistence-jdbc.shared-databases.slick.db") + } + + lazy val durableStateConfig = new DurableStateTableConfiguration(cfg) + lazy val serialization = SerializationExtension(system) + + implicit val defaultPatience = + PatienceConfig(timeout = Span(60, Seconds), interval = Span(10, Millis)) + + def withActorSystem(f: ExtendedActorSystem => Unit): Unit = { + implicit val system: ExtendedActorSystem = + ActorSystem("JdbcDurableStateSpec", config.withFallback(customSerializers)).asInstanceOf[ExtendedActorSystem] + implicit val timeout: Timeout = Timeout(1.minute) + try { + f(system) + } finally { + system.actorSelection("system/" + "akka-persistence-jdbc-durable-state-sequence-actor").resolveOne().onComplete { + case Success(actorRef) => { + system.stop(actorRef) + Thread.sleep(1000) + system.log.debug(s"Is terminated: ${actorRef.isTerminated}") + } + case Failure(_) => + system.log.warning("system/" + "-persistence-jdbc-durable-state-sequence-actorsomename" + " does not exist") + } + system.terminate().futureValue + } + } + + override def beforeAll(): Unit = { + dropAndCreate(schemaType) + super.beforeAll() + } + + override def beforeEach(): Unit = { + dropAndCreate(schemaType) + super.beforeAll() + } + + override def afterAll(): Unit = { + db.close() + system.terminate().futureValue + } +} diff --git a/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/TestProbeDurableStateStoreQuery.scala b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/TestProbeDurableStateStoreQuery.scala new file mode 100644 index 000000000..8d89aedf1 --- /dev/null +++ b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/TestProbeDurableStateStoreQuery.scala @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2014 - 2019 Dennis Vriend + * Copyright (C) 2019 - 2021 Lightbend Inc. + */ + +package akka.persistence.jdbc.state.scaladsl + +import scala.concurrent.Future +import scala.concurrent.duration._ +import akka.NotUsed +import akka.actor.ExtendedActorSystem +import akka.pattern.ask +import akka.persistence.jdbc.config.DurableStateTableConfiguration +import akka.persistence.query.DurableStateChange +import akka.persistence.query.Offset +import akka.persistence.state.scaladsl.GetObjectResult +import akka.stream.scaladsl.Source +import akka.testkit.TestProbe +import akka.util.Timeout +import slick.jdbc.{ JdbcBackend, JdbcProfile } +import akka.serialization.Serialization + +object TestProbeDurableStateStoreQuery { + case class OffsetSequence(offset: Long, limit: Long) +} + +class TestProbeDurableStateStoreQuery( + val probe: TestProbe, + db: JdbcBackend#Database, + profile: JdbcProfile, + durableStateConfig: DurableStateTableConfiguration, + serialization: Serialization)(override implicit val system: ExtendedActorSystem) + extends JdbcDurableStateStore[String](db, profile, durableStateConfig, serialization)(system) { + + implicit val askTimeout = Timeout(100.millis) + + override def getObject(persistenceId: String): Future[GetObjectResult[String]] = ??? + override def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[String], NotUsed] = ??? + + override def changes(tag: String, offset: Offset): Source[DurableStateChange[String], NotUsed] = ??? + + override def stateStoreSequence(offset: Long, limit: Long): Source[Long, NotUsed] = { + val f = probe.ref + .ask(TestProbeDurableStateStoreQuery.OffsetSequence(offset, limit)) + .mapTo[scala.collection.immutable.Seq[Long]] + + Source.future(f).mapConcat(identity) + } + + override def maxStateStoreOffset(): Future[Long] = Future.successful(0) +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 46c0e1622..77dd678dd 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -9,7 +9,7 @@ object Dependencies { val Scala213 = "2.13.4" val ScalaVersions = Seq(Scala212, Scala213) - val AkkaVersion = "2.6.10" + val AkkaVersion = "2.6.14+76-4f721341" // "2.6.14+75-26f8eba4" // "2.6.14+73-9637bb2f" // "2.6.10" val AkkaBinaryVersion = AkkaVersion.take(3) val SlickVersion = "3.3.3"