Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial version of JDBC implementation of DurableState #544

Closed
wants to merge 1 commit into from

Conversation

debasishg
Copy link
Contributor

@debasishg debasishg commented Jun 8, 2021

References akka/akka#30277

Still a WIP but opening a PR for early review feedbacks. Contains the following :

  • JDBC implementation of DurableStateStore (javadsl and scaladsl)
  • Tests including one with payload having a custom serializer
  • Tests run against an H2 database
  • JDBC implementation of DurableStateStoreQuery (javadsl and scaladsl)
  • Tests for currentChanges and changes APIs against H2 and Postgres database
  • DurableStateSequenceActor implemented for monitoring gaps in offsets
  • Integration tests

To run tests:

// runs against h2 in memory database
$ sbt testOnly akka.persistence.jdbc.state.scaladsl.H2DurableStateSpec
...


// Integration test
// runs against Postgres database : 
// Ensure a Postgres instance running having a user named `docker` and a database named `docker`
$ It/testOnly akka.persistence.jdbc.integration.PostgresScalaJdbcDurableStateStoreQueryTest

@debasishg debasishg requested review from patriknw and johanandren June 8, 2021 07:37
Copy link
Member

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking good

@patriknw
Copy link
Member

patriknw commented Jun 8, 2021

In upsert we have tag as an input. Should we store tags in table as well

yes, that was what I mentioned in #544 (comment), but you can do that as a second step if you like

@patriknw
Copy link
Member

patriknw commented Jun 8, 2021

Also, just curious, can there be only 1 tag ?

I suggest that we only support one tag, see akka/akka#30277 (comment)

@patriknw
Copy link
Member

patriknw commented Jun 8, 2021

But upsert takes a seqNr as input - shouldn't we use that input instead of adding 1 to the current value ? Also adding 1 to the current value means we need to fetch the current value. So do we need to have this fetch + update in a single transaction ?

Right, the seqNr parameter is the "new" seqNr. So it would looks something like this (pseudo):

if (seqNr == 1) {
  "insert... "
} else {
  val updatedRows = "update foo ... where seq_nr = ${seqNr - 1}" 
  if (updatedRows != 1)
    throw boom
}

@debasishg
Copy link
Contributor Author

@patriknw regarding the following ..

if (seqNr == 1) {
  "insert... "
} else {
  val updatedRows = "update foo ... where seq_nr = ${seqNr - 1}" 
  if (updatedRows != 1)
    throw boom
}

The above code is not transactionally safe. Someone can insert a row between the check if (seqNr ==1) and doing the insert. Slick supports insertAndUpdate which will do transactional upsert if the underlying database supports it. I have used insertAndUpdate at the JDBC level. And at the top level I throw if seqNr is < 0. So first time insert and subsequent updates will be taken care of at the JDBC level if u supply a valid sequence number.

@patriknw
Copy link
Member

patriknw commented Jun 8, 2021

great if insertAndUpdate will handle it

@debasishg
Copy link
Contributor Author

@patriknw I added some unit tests as well. The ones added use DurableStateStore[String]. I will be working on some more with custom serializers. Please take a look when you have some time.

@debasishg debasishg marked this pull request as ready for review June 9, 2021 11:11
Copy link
Member

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good, a few comments. I'd like @octonato to review this to make sure that it conforms to existing conventions in this plugin.

} { v =>
v.value shouldBe Some("updated valid string")
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add another test when wrong seqNr is used (accidental concurrent writers)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean 2 writers trying to write with the same sequence number and same persistence id ? What is the expected behavior here ? Note currently the primary key on the table is persistence_id ..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

second write should fail if it doesn't have the expected sequence number

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added ..

Copy link
Member

@octonato octonato left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good @debasishg. Thanks.

I left a few comments and suggestions.

@debasishg
Copy link
Contributor Author

@patriknw, @octonato Made the following changes:

  • added tag as a column in table
  • reordered column names as per suggestion
  • changed logic in upsertObject - got rid of Slick's insertAndUpdate and incorporated logic based on sequence number
  • added JdbcDurableStateStoreProvider class
  • some refactoring in slick code
  • some class renames as per suggestion
  • added more tests as per suggestion

Please take another look ..

Copy link
Member

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great that the optimistic locking could be added

Copy link
Member

@octonato octonato left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to see we got the optimistic locking in a quite straightforward way. That will be quite cool to have durable state actors.

@debasishg, have a look on how we deal with integration tests, for example here:
https://github.com/akka/akka-persistence-jdbc/blob/master/core/src/it/scala/akka/persistence/jdbc/integration/AllPersistenceIdsTest.scala

Each time we have the h2 one that is run in test and then the other DBs are in it:test and they run with docker.

In the scripts folder, we have some launch scripts to start the dockers for each DB individually and then you can run sbt It/testOnly akka.persistence.jdbc.integration.Postgres*" to pick the DB you want to test.

It will be useful to add the integration tests as soon as possible and verify if the update is behaving the way we want, ie: returning 0 affected rows when 'failing'.

@debasishg
Copy link
Contributor Author

@patriknw @octonato Just pushed in an initial implementation of JdbcDurableStateStoreQuery. Could you please take a look. I will soon start writing the tests but it would be good to have another pair of eyes at the implementation before that.

@debasishg
Copy link
Contributor Author

@patriknw Done the changes that you suggested. Please take a look when u have some time. The JdbcDurableStateStore now has the query implementation as well. And I have implemented the changes function along the same lines as the one in eventsByTag in JdbcReadJournal subject to some caveats that I mentioned in the last comment.

@debasishg
Copy link
Contributor Author

@patriknw @octonato - Could you please take another look at the implementation ? I have tests based on h2 that run ok. I will be working on the Postgres based implementation from tomorrow. (I am on well-being day leave today)

Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some nitpicks and one concurrency issue.

@debasishg
Copy link
Contributor Author

@patriknw @johanandren @octonato May be another look ? I have the actor in place now to monitor the monotonicity. Addressed other issues raised in the PR feedback.

Copy link
Member

@octonato octonato left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@debasishg, I finally got some time to do a more detailed review.

I think we will need to re-think how to deal with missing updates as they can still happen, it seems (see my comments).

I will be back on Tuesday only and I will have a hackday, but I can make some time for a call. Otherwise, Wednesday.

Comment on lines +52 to +57
/**
* To support the changesByTag query, this actor keeps track of which rows are visible in the database.
* This is required to guarantee the changesByTag does not skip any rows in case rows with a higher (ordering) id are
* visible in the database before rows with a lower (ordering) id.
*/
class DurableStateSequenceActor[A](stateStore: JdbcDurableStateStore[A], config: DurableStateSequenceRetrievalConfig)(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this will do the trick for the durable state. The JournalSequenceActor was designed to detect gaps that occur when a more recent ordering (n) shows up on the table before a previous one (n-1).

Although that could also happen for durable state, we should have something that can also detect missing updates.

If we have:

1 | abc
2 | def

And both get updated, abc becomes 3 and def becomes 4. It's possible def transaction commits first and the query see the table as:

1 | abc
4 | def

The stream delives 4, user saves the offset and now 3 will never be delivered. We lost an update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@octonato let me first clear the confusion in terminology. What u r saying as ordering for the JournalSequenceActor it is the global_offset in DurableStateSequenceActor - am I correct ? So in the implementation of the actor, OrderingId refers to ordering for the journal and the offset for the state implementation. JournalSequenceActor detects missing ordering, while DurableStateSequenceActor detects missing offsets. Then why should we have a lost update in this implementation ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@octonato In other words why should the actor not detect that 3 is missing and deliver it as well ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JournalSequenceActor detects missing ordering, while DurableStateSequenceActor detects missing offsets

That is right.

I assume that Renato's example is using global_offset as the first column and persistence_id as the second column.

For that example, the DurableStateSequenceActor will detect that offset 3 is missing when it sees 4, it will query again, and will eventually find 3. 4 is not delivered until 3 has been seen.

However, there is a another scenario where we have a problem. If there are several quick updates for the same persistence_id we will see the intermediate updates as missing offsets. Those will be retried and give up on those after timeout. All good in the end, but it will drastically slow down the queries.

global_offset | persistence_id | revision
---------------------------------------
17            | abc            | 1

2 quick updates of "abc" and the query will see

global_offset | persistence_id | revision
---------------------------------------
19            | abc            | 3

missing offset 18

could be more complex with several persistence_ids updated quickly

global_offset | persistence_id | revision
---------------------------------------
17            | abc            | 1
18            | def            | 1

2 quick updates of abc and 2 of def

global_offset | persistence_id | revision
---------------------------------------
20            | abc            | 3
22            | def            | 3

missing offset 19 and 21

One way to solve that could be to also track seen persistence_id/revision. In above example, we could then see that a total of 2 revisions are missing and that matches 2 missing offsets. Then it should be safe to say that we have seen all changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@patriknw perhaps I am missing something. When we query for the changes we want to get the latest offset for a persistence_id for some specified tag and NOT all the intermediate ones. Then what is the harm in the above case if we do not see offset 19 and 21 ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't know if 19 and 21 were written by abc and def respectively. They could have been written by xyz and then we can't ignore them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@patriknw But if they are of a different persistence_id, then the actor will eventually fetch them in the query. Maybe I am missing how can the actor miss them .. If 19 and 21 are from diff persistence_ids then they will be inserted and the query will get them. If they are from the same persistence id, then also the actor will get at least the max of them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is right, with current implementation, but my point was:

Those will be retried and give up on those after timeout. All good in the end, but it will drastically slow down the queries.

We will always see such timeout delays when there are quick updates to the same persistence_id. Since that seems like a rather common scenario it would be good to find a solution where we can avoid these delays (in most cases).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah .. ok .. makes sense now.

Copy link
Member

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a few minor comments

refreshInterval = "1s"
# can be H2, POSTGRES
# schemaType = H2
schemaType = Postgres
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it have a default value? does the journal have a default value for this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that so that we will only support it for Postgres and H2? Meaning, only Postgres (for prod)?

should it have a default value? does the journal have a default value for this?

I don't think so. Specially if we don't want/plan to support other DBs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@patriknw @octonato The schemaType needs to have a value since we have some platform specific code in the implementation. The statement that fetches the next value of the sequence differs between Postgres and H2 (haven't yet looked at other implementations though). Hence when building the sequence updater I have code like this ..

lazy val sequenceNextValUpdater = durableStateTableCfg.schemaType match {
  case "H2"       => new H2SequenceNextValUpdater(profile)
  case "Postgres" => new PostgresSequenceNextValUpdater(profile)
}

Hence the config needs to have the schemaType with a default value. This can of course be changed by changing the config when required (e.g. to run tests).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user should only choose database at one place, which is the Slick profile: akka-persistence-jdbc.shared-databases.slick.profile. We should be able to look at the profile choice and pick the right SequenceNextValUpdater implementation from that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@patriknw Now fixed. schemaType removed from reference.conf

.mapConcat(identity)
}

private[jdbc] def maxStateStoreSequence(): Future[Long] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to maxStateStoreOffset ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done ..

private[jdbc] def selectFromDbByPersistenceId(persistenceId: Rep[String]) =
durableStateTable.filter(_.persistenceId === persistenceId)

private[jdbc] def insertDbWithDurableState(row: DurableStateTables.DurableStateRow, seqName: String) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename s/seqName/seqNextValue/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done ..

implicit val ec: ExecutionContext = system.dispatcher
implicit val mat: Materializer = SystemMaterializer(system).materializer

override def scaladslDurableStateStore(): DurableStateStore[Any] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val instead of def

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done ..

override def scaladslDurableStateStore(): DurableStateStore[Any] =
new scaladsl.JdbcDurableStateStore[Any](db, profile, durableStateConfig, serialization)

override def javadslDurableStateStore(): JDurableStateStore[AnyRef] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done ..

Copy link
Member

@octonato octonato left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@debasishg, I left a few more comments and suggestions.

Thanks for all that work and patience. 😄

refreshInterval = "1s"
# can be H2, POSTGRES
# schemaType = H2
schemaType = Postgres
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that so that we will only support it for Postgres and H2? Meaning, only Postgres (for prod)?

should it have a default value? does the journal have a default value for this?

I don't think so. Specially if we don't want/plan to support other DBs.

Comment on lines 8 to 11
/*
* Copyright (C) 2014 - 2019 Dennis Vriend <https://github.com/dnvriend>
* Copyright (C) 2019 - 2021 Lightbend Inc. <https://www.lightbend.com>
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something went wrong here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed ..

Comment on lines 34 to 36
def fromRow(serialization: Serialization)(row: DurableStateTables.DurableStateRow): Try[AnyRef] = {
serialization.deserialize(row.statePayload, row.stateSerId, row.stateSerManifest.getOrElse(""))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to have only one AkkaSerialization object. And we can have two fromRow methods. Either removing the currying or having explicit named for the fromRow method.

This is internal API, so we don't need to be cautious when making bin-incompatible changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to have 1 AkkaSerialization object

  • Moved AkkaSerialization to akka.persistence.jdbc. It was in akka.persistence.jdbc.journal.dao. Didn't want to have state depend on journal. Hence the move
  • Added a method fromDurableStateRow to the object AkkaSerialization to be used from state. Changed the name to avoid any ambiguity.
  • Removed the other AkkaSerialization object

Comment on lines 15 to 34
lazy val sequenceNextValUpdater = durableStateTableCfg.schemaType match {
case "H2" => new H2SequenceNextValUpdater(profile)
case "Postgres" => new PostgresSequenceNextValUpdater(profile)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we planning to add the others or have we discovered that the trick we do with the seqNum is not possible with the other DBs?

Otherwise, we can add a catch-all and throw an informative exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will add other dbs as well. I am sure the scheme will work for other dbs. But since we decided to test for H2 and Postgres for the time being, I kept it like that. Added a case_ => ??? that will throw for now.

@debasishg
Copy link
Contributor Author

@octonato @patriknw Incorporated all the suggestions raised so far. Another look ?

@debasishg debasishg force-pushed the wip-durable-state branch from 11b6af7 to 3bf7b7b Compare July 1, 2021 09:30
@debasishg
Copy link
Contributor Author

Closing this PR after squashing all commits. This is going to be the feature branch for durable state implementation.

@debasishg debasishg closed this Jul 1, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants