diff --git a/core/src/test/java/akka/persistence/jdbc/JavadslSnippets.java b/core/src/test/java/akka/persistence/jdbc/JavadslSnippets.java index c407eb172..73fb3f8ee 100644 --- a/core/src/test/java/akka/persistence/jdbc/JavadslSnippets.java +++ b/core/src/test/java/akka/persistence/jdbc/JavadslSnippets.java @@ -36,68 +36,69 @@ import java.util.concurrent.CompletionStage; final class JavadslSnippets { - void create() { - // #create + void create() { + // #create - ActorSystem actorSystem = ActorSystem.create("example"); - CompletionStage done = SchemaUtils.createIfNotExists(actorSystem); - // #create - } + ActorSystem actorSystem = ActorSystem.create("example"); + CompletionStage done = SchemaUtils.createIfNotExists(actorSystem); + // #create + } - void readJournal() { - ActorSystem system = ActorSystem.create("example"); - // #read-journal + void readJournal() { + ActorSystem system = ActorSystem.create("example"); + // #read-journal - final JdbcReadJournal readJournal = PersistenceQuery.get(system) - .getReadJournalFor(JdbcReadJournal.class, JdbcReadJournal.Identifier()); - // #read-journal + final JdbcReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor(JdbcReadJournal.class, JdbcReadJournal.Identifier()); + // #read-journal - } + } - void persistenceIds() { - ActorSystem system = ActorSystem.create(); - // #persistence-ids + void persistenceIds() { + ActorSystem system = ActorSystem.create(); + // #persistence-ids - JdbcReadJournal readJournal = PersistenceQuery.get(system) - .getReadJournalFor(JdbcReadJournal.class, JdbcReadJournal.Identifier()); + JdbcReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor(JdbcReadJournal.class, JdbcReadJournal.Identifier()); - Source willNotCompleteTheStream = readJournal.persistenceIds(); + Source willNotCompleteTheStream = readJournal.persistenceIds(); - Source willCompleteTheStream = readJournal.currentPersistenceIds(); - // #persistence-ids - } + Source willCompleteTheStream = readJournal.currentPersistenceIds(); + // #persistence-ids + } - void eventsByPersistenceIds() { - ActorSystem system = ActorSystem.create(); + void eventsByPersistenceIds() { + ActorSystem system = ActorSystem.create(); - // #events-by-persistence-id + // #events-by-persistence-id - JdbcReadJournal readJournal = PersistenceQuery.get(system) - .getReadJournalFor(JdbcReadJournal.class, JdbcReadJournal.Identifier()); + JdbcReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor(JdbcReadJournal.class, JdbcReadJournal.Identifier()); - Source willNotCompleteTheStream = readJournal - .eventsByPersistenceId("some-persistence-id", 0L, Long.MAX_VALUE); + Source willNotCompleteTheStream = + readJournal.eventsByPersistenceId("some-persistence-id", 0L, Long.MAX_VALUE); - Source willCompleteTheStream = readJournal - .currentEventsByPersistenceId("some-persistence-id", 0L, Long.MAX_VALUE); - // #events-by-persistence-id - } + Source willCompleteTheStream = + readJournal.currentEventsByPersistenceId("some-persistence-id", 0L, Long.MAX_VALUE); + // #events-by-persistence-id + } - void eventsByTag() { - ActorSystem system = ActorSystem.create(); - // #events-by-tag - - JdbcReadJournal readJournal = PersistenceQuery.get(system) - .getReadJournalFor(JdbcReadJournal.class, JdbcReadJournal.Identifier()); - - Source willNotCompleteTheStream = readJournal - .eventsByTag("apple", Offset.sequence(0L)); - - Source willCompleteTheStream = readJournal - .currentEventsByTag("apple", Offset.sequence(0L)); - // #events-by-tag - } + void eventsByTag() { + ActorSystem system = ActorSystem.create(); + // #events-by-tag + JdbcReadJournal readJournal = + PersistenceQuery.get(system) + .getReadJournalFor(JdbcReadJournal.class, JdbcReadJournal.Identifier()); + Source willNotCompleteTheStream = + readJournal.eventsByTag("apple", Offset.sequence(0L)); + Source willCompleteTheStream = + readJournal.currentEventsByTag("apple", Offset.sequence(0L)); + // #events-by-tag + } } diff --git a/core/src/test/java/akka/persistence/jdbc/state/JavadslSnippets.java b/core/src/test/java/akka/persistence/jdbc/state/JavadslSnippets.java new file mode 100644 index 000000000..c27a18d54 --- /dev/null +++ b/core/src/test/java/akka/persistence/jdbc/state/JavadslSnippets.java @@ -0,0 +1,159 @@ +package akka.persistence.jdbc.state; + +import java.util.concurrent.CompletionStage; +import akka.actor.ActorSystem; +import akka.Done; +import akka.NotUsed; +// #create +import akka.persistence.jdbc.testkit.javadsl.SchemaUtils; +// #create +// #jdbc-durable-state-store +import akka.persistence.state.DurableStateStoreRegistry; +import akka.persistence.jdbc.state.javadsl.JdbcDurableStateStore; +// #jdbc-durable-state-store +// #get-object +import akka.persistence.state.DurableStateStoreRegistry; +import akka.persistence.jdbc.state.javadsl.JdbcDurableStateStore; +import akka.persistence.state.javadsl.GetObjectResult; +// #get-object +// #upsert-get-object +import akka.persistence.state.DurableStateStoreRegistry; +import akka.persistence.jdbc.state.javadsl.JdbcDurableStateStore; +import akka.persistence.state.javadsl.GetObjectResult; +// #upsert-get-object +// #delete-object +import akka.persistence.state.DurableStateStoreRegistry; +import akka.persistence.jdbc.state.javadsl.JdbcDurableStateStore; +// #delete-object +// #current-changes +import akka.NotUsed; +import akka.stream.javadsl.Source; +import akka.persistence.state.DurableStateStoreRegistry; +import akka.persistence.jdbc.state.javadsl.JdbcDurableStateStore; +import akka.persistence.query.DurableStateChange; +import akka.persistence.query.NoOffset; +// #current-changes +// #changes +import akka.NotUsed; +import akka.stream.javadsl.Source; +import akka.persistence.state.DurableStateStoreRegistry; +import akka.persistence.jdbc.state.javadsl.JdbcDurableStateStore; +import akka.persistence.query.DurableStateChange; +import akka.persistence.query.NoOffset; +// #changes + +final class JavadslSnippets { + void create() { + // #create + + ActorSystem system = ActorSystem.create("example"); + CompletionStage done = SchemaUtils.createIfNotExists(system); + // #create + } + + void durableStatePlugin() { + ActorSystem system = ActorSystem.create("example"); + + // #jdbc-durable-state-store + + @SuppressWarnings("unchecked") + JdbcDurableStateStore store = + DurableStateStoreRegistry.get(system) + .getDurableStateStoreFor(JdbcDurableStateStore.class, "akka.persistence.state.jdbc"); + // #jdbc-durable-state-store + } + + void getObject() { + ActorSystem system = ActorSystem.create("example"); + + // #get-object + + @SuppressWarnings("unchecked") + JdbcDurableStateStore store = + DurableStateStoreRegistry.get(system) + .getDurableStateStoreFor(JdbcDurableStateStore.class, "akka.persistence.state.jdbc"); + + CompletionStage> futureResult = store.getObject("InvalidPersistenceId"); + try { + GetObjectResult result = futureResult.toCompletableFuture().get(); + assert !result.value().isPresent(); + } catch (Exception e) { + // handle exceptions + } + // #get-object + } + + void upsertAndGetObject() { + ActorSystem system = ActorSystem.create("example"); + + // #upsert-get-object + + @SuppressWarnings("unchecked") + JdbcDurableStateStore store = + DurableStateStoreRegistry.get(system) + .getDurableStateStoreFor(JdbcDurableStateStore.class, "akka.persistence.state.jdbc"); + + CompletionStage> r = + store + .upsertObject("p234", 1, "a valid string", "t123") + .thenCompose(d -> store.getObject("p234")) + .thenCompose(o -> store.upsertObject("p234", 2, "updated valid string", "t123")) + .thenCompose(d -> store.getObject("p234")); + + try { + assert r.toCompletableFuture().get().value().get().equals("updated valid string"); + } catch (Exception e) { + // handle exceptions + } + // #upsert-get-object + } + + void deleteObject() { + ActorSystem system = ActorSystem.create("example"); + + // #delete-object + + @SuppressWarnings("unchecked") + JdbcDurableStateStore store = + DurableStateStoreRegistry.get(system) + .getDurableStateStoreFor(JdbcDurableStateStore.class, "akka.persistence.state.jdbc"); + + CompletionStage futureResult = store.deleteObject("p123"); + try { + assert futureResult.toCompletableFuture().get().equals(Done.getInstance()); + } catch (Exception e) { + // handle exceptions + } + // #delete-object + } + + void currentChanges() { + ActorSystem system = ActorSystem.create("example"); + + // #current-changes + + @SuppressWarnings("unchecked") + JdbcDurableStateStore store = + DurableStateStoreRegistry.get(system) + .getDurableStateStoreFor(JdbcDurableStateStore.class, "akka.persistence.state.jdbc"); + + Source, NotUsed> willCompleteTheStream = + store.currentChanges("tag-1", NoOffset.getInstance()); + // #current-changes + } + + void changes() { + ActorSystem system = ActorSystem.create("example"); + + // #changes + + @SuppressWarnings("unchecked") + JdbcDurableStateStore store = + DurableStateStoreRegistry.get(system) + .getDurableStateStoreFor(JdbcDurableStateStore.class, "akka.persistence.state.jdbc"); + + Source, NotUsed> willNotCompleteTheStream = + store.changes("tag-1", NoOffset.getInstance()); + // #changes + } +} diff --git a/core/src/test/scala/akka/persistence/jdbc/state/ScaladslSnippets.scala b/core/src/test/scala/akka/persistence/jdbc/state/ScaladslSnippets.scala new file mode 100644 index 000000000..5ab0e4548 --- /dev/null +++ b/core/src/test/scala/akka/persistence/jdbc/state/ScaladslSnippets.scala @@ -0,0 +1,130 @@ +package akka.persistence.jdbc.state + +import scala.concurrent.Future +import akka.actor.ActorSystem +import akka.Done +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.matchers.should.Matchers + +object ScaladslSnippets extends ScalaFutures with Matchers { + + def create(): Unit = { + // #create + import akka.persistence.jdbc.testkit.scaladsl.SchemaUtils + + implicit val system: ActorSystem = ActorSystem("example") + val _: Future[Done] = SchemaUtils.createIfNotExists() + // #create + } + + def durableStatePlugin(): Unit = { + implicit val system: ActorSystem = ActorSystem() + + // #jdbc-durable-state-store + import akka.persistence.state.DurableStateStoreRegistry + import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore + val store = DurableStateStoreRegistry + .get(system) + .durableStateStoreFor[JdbcDurableStateStore[String]]("akka.persistence.state.jdbc") + // #jdbc-durable-state-store + } + + def getObject(): Unit = { + implicit val system: ActorSystem = ActorSystem() + + // #get-object + import akka.persistence.state.DurableStateStoreRegistry + import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore + import akka.persistence.state.scaladsl.GetObjectResult + + val store = DurableStateStoreRegistry + .get(system) + .durableStateStoreFor[JdbcDurableStateStore[String]]("akka.persistence.state.jdbc") + + val futureResult: Future[GetObjectResult[String]] = store.getObject("InvalidPersistenceId") + futureResult.futureValue.value shouldBe None + // #get-object + } + + def upsertAndGetObject(): Unit = { + implicit val system: ActorSystem = ActorSystem() + implicit val e = system.dispatcher + + // #upsert-get-object + import akka.persistence.state.DurableStateStoreRegistry + import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore + import akka.persistence.state.scaladsl.GetObjectResult + + val store = DurableStateStoreRegistry + .get(system) + .durableStateStoreFor[JdbcDurableStateStore[String]]("akka.persistence.state.jdbc") + + val v: Future[GetObjectResult[String]] = + for { + n <- store.upsertObject("p234", 1, "a valid string", "t123") + _ = n shouldBe akka.Done + g <- store.getObject("p234") + _ = g.value shouldBe Some("a valid string") + u <- store.upsertObject("p234", 2, "updated valid string", "t123") + _ = u shouldBe akka.Done + h <- store.getObject("p234") + } yield h + + v.futureValue.value shouldBe Some("updated valid string") + // #upsert-get-object + } + + def deleteObject(): Unit = { + implicit val system: ActorSystem = ActorSystem() + + // #delete-object + import akka.persistence.state.DurableStateStoreRegistry + import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore + + val store = DurableStateStoreRegistry + .get(system) + .durableStateStoreFor[JdbcDurableStateStore[String]]("akka.persistence.state.jdbc") + + store.deleteObject("p123").futureValue shouldBe Done + store.getObject("p123").futureValue.value shouldBe None + // #delete-object + } + + def currentChanges(): Unit = { + implicit val system: ActorSystem = ActorSystem() + + // #current-changes + import akka.NotUsed + import akka.stream.scaladsl.Source + import akka.persistence.state.DurableStateStoreRegistry + import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore + import akka.persistence.query.{ DurableStateChange, NoOffset } + + val store = DurableStateStoreRegistry + .get(system) + .durableStateStoreFor[JdbcDurableStateStore[String]]("akka.persistence.state.jdbc") + + val willCompleteTheStream: Source[DurableStateChange[String], NotUsed] = + store.currentChanges("tag-1", NoOffset) + // #current-changes + } + + def changes(): Unit = { + implicit val system: ActorSystem = ActorSystem() + + // #changes + import akka.NotUsed + import akka.stream.scaladsl.Source + import akka.persistence.state.DurableStateStoreRegistry + import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore + import akka.persistence.query.{ DurableStateChange, NoOffset } + + val store = DurableStateStoreRegistry + .get(system) + .durableStateStoreFor[JdbcDurableStateStore[String]]("akka.persistence.state.jdbc") + + val willNotCompleteTheStream: Source[DurableStateChange[String], NotUsed] = + store.changes("tag-1", NoOffset) + // #changes + } +} diff --git a/docs/src/main/paradox/durable-state-store.md b/docs/src/main/paradox/durable-state-store.md index cc924493d..b728b5b00 100644 --- a/docs/src/main/paradox/durable-state-store.md +++ b/docs/src/main/paradox/durable-state-store.md @@ -1,4 +1,72 @@ # DurableStateStore -TODO Add docs. ## How to get the DurableStateStore -TODO Add docs. + +The `DurableStateStore` for JDBC plugin is obtained through the `DurableStateStoreRegistry` extension. + +Scala +: @@snip[snip](/core/src/test/scala/akka/persistence/jdbc/state/ScaladslSnippets.scala) { #jdbc-durable-state-store } + +Java +: @@snip[snip](/core/src/test/java/akka/persistence/jdbc/state/JavadslSnippets.java) { #jdbc-durable-state-store } + +## APIs supported by DurableStateStore + +The plugin supports the following APIs: + +### getObject + +`getObject(persistenceId)` returns `GetObjectResult(value, revision)`, where `value` is an `Option` (`Optional` in Java) +and is set to the value of the object if it exists with the passed in `persistenceId`. Otherwise `value` is empty. + +Scala +: @@snip[snip](/core/src/test/scala/akka/persistence/jdbc/state/ScaladslSnippets.scala) { #get-object } + +Java +: @@snip[snip](/core/src/test/java/akka/persistence/jdbc/state/JavadslSnippets.java) { #get-object } + +### upsertObject + +`upsertObject(persistenceId, revision, value, tag)` inserts the record if the `persistenceId` does not exist in the +database. Or else it updates the record with the latest revision passed as `revision`. The update succeeds only if the +incoming `revision` is 1 more than the already existing one. This snippet is an example of a sequnece of `upsertObject` +and `getObject`. + +Scala +: @@snip[snip](/core/src/test/scala/akka/persistence/jdbc/state/ScaladslSnippets.scala) { #upsert-get-object } + +Java +: @@snip[snip](/core/src/test/java/akka/persistence/jdbc/state/JavadslSnippets.java) { #upsert-get-object } + +### deleteObject + +`deleteObject(persistenceId)` deletes the record with the input `persistenceId`. + +Scala +: @@snip[snip](/core/src/test/scala/akka/persistence/jdbc/state/ScaladslSnippets.scala) { #delete-object } + +Java +: @@snip[snip](/core/src/test/java/akka/persistence/jdbc/state/JavadslSnippets.java) { #delete-object } + +### currentChanges + +`currentChanges(tag, offset)` gets a source of the most recent changes made to objects with the given `tag` since +the passed in `offset`. This api returns changes that occurred up to when the `Source` returned by this call is materialized. + +Scala +: @@snip[snip](/core/src/test/scala/akka/persistence/jdbc/state/ScaladslSnippets.scala) { #current-changes } + +Java +: @@snip[snip](/core/src/test/java/akka/persistence/jdbc/state/JavadslSnippets.java) { #current-changes } + +### changes + +`changes(tag, offset)` gets a source of the most recent changes made to objects with the given `tag` since +the passed in `offset`. The returned source will never terminate, it effectively watches for changes to the objects +and emits changes as they happen. + +Scala +: @@snip[snip](/core/src/test/scala/akka/persistence/jdbc/state/ScaladslSnippets.scala) { #changes } + +Java +: @@snip[snip](/core/src/test/java/akka/persistence/jdbc/state/JavadslSnippets.java) { #changes } + diff --git a/project/plugins.sbt b/project/plugins.sbt index d1100d3f3..95a662100 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -3,6 +3,8 @@ addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.6.0") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2") addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.9.1") addSbtPlugin("com.lightbend" % "sbt-whitesource" % "0.1.18") +addSbtPlugin("com.lightbend.sbt" % "sbt-java-formatter" % "0.6.0") + // release addSbtPlugin("com.geirsson" % "sbt-ci-release" % "1.5.6") addSbtPlugin("com.dwijnand" % "sbt-dynver" % "4.1.1")