-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial version of JDBC implementation of DurableState #544
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking good
core/src/main/scala/akka/persistence/jdbc/state/javadsl/DurableStateStore.scala
Outdated
Show resolved
Hide resolved
yes, that was what I mentioned in #544 (comment), but you can do that as a second step if you like |
I suggest that we only support one tag, see akka/akka#30277 (comment) |
Right, the seqNr parameter is the "new" seqNr. So it would looks something like this (pseudo):
|
@patriknw regarding the following .. if (seqNr == 1) {
"insert... "
} else {
val updatedRows = "update foo ... where seq_nr = ${seqNr - 1}"
if (updatedRows != 1)
throw boom
} The above code is not transactionally safe. Someone can insert a row between the check |
great if |
@patriknw I added some unit tests as well. The ones added use |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good, a few comments. I'd like @octonato to review this to make sure that it conforms to existing conventions in this plugin.
core/src/main/scala/akka/persistence/jdbc/state/DurableStateTables.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/javadsl/DurableStateStore.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/scaladsl/DurableStateStore.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/scaladsl/DurableStateStore.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala
Outdated
Show resolved
Hide resolved
} { v => | ||
v.value shouldBe Some("updated valid string") | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add another test when wrong seqNr is used (accidental concurrent writers)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean 2 writers trying to write with the same sequence number and same persistence id ? What is the expected behavior here ? Note currently the primary key on the table is persistence_id
..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
second write should fail if it doesn't have the expected sequence number
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good @debasishg. Thanks.
I left a few comments and suggestions.
core/src/main/scala/akka/persistence/jdbc/state/DurableStateTables.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/scaladsl/DurableStateStore.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/scaladsl/DurableStateStore.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala
Outdated
Show resolved
Hide resolved
@patriknw, @octonato Made the following changes:
Please take another look .. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great that the optimistic locking could be added
core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/akka/persistence/jdbc/state/JdbcDurableStateSpec.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy to see we got the optimistic locking in a quite straightforward way. That will be quite cool to have durable state actors.
@debasishg, have a look on how we deal with integration tests, for example here:
https://github.com/akka/akka-persistence-jdbc/blob/master/core/src/it/scala/akka/persistence/jdbc/integration/AllPersistenceIdsTest.scala
Each time we have the h2
one that is run in test
and then the other DBs are in it:test
and they run with docker.
In the scripts folder, we have some launch scripts to start the dockers for each DB individually and then you can run sbt It/testOnly akka.persistence.jdbc.integration.Postgres*"
to pick the DB you want to test.
It will be useful to add the integration tests as soon as possible and verify if the update is behaving the way we want, ie: returning 0 affected rows when 'failing'.
core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStoreQuery.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStoreQuery.scala
Outdated
Show resolved
Hide resolved
@patriknw Done the changes that you suggested. Please take a look when u have some time. The |
core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStoreQuery.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some nitpicks and one concurrency issue.
core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/scaladsl/package.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
Outdated
Show resolved
Hide resolved
@patriknw @johanandren @octonato May be another look ? I have the actor in place now to monitor the monotonicity. Addressed other issues raised in the PR feedback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@debasishg, I finally got some time to do a more detailed review.
I think we will need to re-think how to deal with missing updates as they can still happen, it seems (see my comments).
I will be back on Tuesday only and I will have a hackday, but I can make some time for a call. Otherwise, Wednesday.
core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala
Outdated
Show resolved
Hide resolved
/** | ||
* To support the changesByTag query, this actor keeps track of which rows are visible in the database. | ||
* This is required to guarantee the changesByTag does not skip any rows in case rows with a higher (ordering) id are | ||
* visible in the database before rows with a lower (ordering) id. | ||
*/ | ||
class DurableStateSequenceActor[A](stateStore: JdbcDurableStateStore[A], config: DurableStateSequenceRetrievalConfig)( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this will do the trick for the durable state. The JournalSequenceActor
was designed to detect gaps that occur when a more recent ordering (n
) shows up on the table before a previous one (n-1
).
Although that could also happen for durable state, we should have something that can also detect missing updates.
If we have:
1 | abc
2 | def
And both get updated, abc
becomes 3 and def
becomes 4. It's possible def
transaction commits first and the query see the table as:
1 | abc
4 | def
The stream delives 4, user saves the offset and now 3 will never be delivered. We lost an update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@octonato let me first clear the confusion in terminology. What u r saying as ordering
for the JournalSequenceActor
it is the global_offset
in DurableStateSequenceActor
- am I correct ? So in the implementation of the actor, OrderingId
refers to ordering
for the journal and the offset
for the state implementation. JournalSequenceActor
detects missing ordering
, while DurableStateSequenceActor
detects missing offsets
. Then why should we have a lost update in this implementation ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@octonato In other words why should the actor not detect that 3 is missing and deliver it as well ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JournalSequenceActor detects missing ordering, while DurableStateSequenceActor detects missing offsets
That is right.
I assume that Renato's example is using global_offset as the first column and persistence_id as the second column.
For that example, the DurableStateSequenceActor will detect that offset 3 is missing when it sees 4, it will query again, and will eventually find 3. 4 is not delivered until 3 has been seen.
However, there is a another scenario where we have a problem. If there are several quick updates for the same persistence_id we will see the intermediate updates as missing offsets. Those will be retried and give up on those after timeout. All good in the end, but it will drastically slow down the queries.
global_offset | persistence_id | revision
---------------------------------------
17 | abc | 1
2 quick updates of "abc" and the query will see
global_offset | persistence_id | revision
---------------------------------------
19 | abc | 3
missing offset 18
could be more complex with several persistence_ids updated quickly
global_offset | persistence_id | revision
---------------------------------------
17 | abc | 1
18 | def | 1
2 quick updates of abc and 2 of def
global_offset | persistence_id | revision
---------------------------------------
20 | abc | 3
22 | def | 3
missing offset 19 and 21
One way to solve that could be to also track seen persistence_id/revision. In above example, we could then see that a total of 2 revisions are missing and that matches 2 missing offsets. Then it should be safe to say that we have seen all changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@patriknw perhaps I am missing something. When we query for the changes
we want to get the latest offset for a persistence_id
for some specified tag
and NOT all the intermediate ones. Then what is the harm in the above case if we do not see offset 19 and 21 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't know if 19 and 21 were written by abc and def respectively. They could have been written by xyz and then we can't ignore them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@patriknw But if they are of a different persistence_id, then the actor will eventually fetch them in the query. Maybe I am missing how can the actor miss them .. If 19 and 21 are from diff persistence_ids then they will be inserted and the query will get them. If they are from the same persistence id, then also the actor will get at least the max of them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is right, with current implementation, but my point was:
Those will be retried and give up on those after timeout. All good in the end, but it will drastically slow down the queries.
We will always see such timeout delays when there are quick updates to the same persistence_id. Since that seems like a rather common scenario it would be good to find a solution where we can avoid these delays (in most cases).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah .. ok .. makes sense now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a few minor comments
refreshInterval = "1s" | ||
# can be H2, POSTGRES | ||
# schemaType = H2 | ||
schemaType = Postgres |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it have a default value? does the journal have a default value for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that so that we will only support it for Postgres and H2? Meaning, only Postgres (for prod)?
should it have a default value? does the journal have a default value for this?
I don't think so. Specially if we don't want/plan to support other DBs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@patriknw @octonato The schemaType
needs to have a value since we have some platform specific code in the implementation. The statement that fetches the next value of the sequence differs between Postgres and H2 (haven't yet looked at other implementations though). Hence when building the sequence updater I have code like this ..
lazy val sequenceNextValUpdater = durableStateTableCfg.schemaType match {
case "H2" => new H2SequenceNextValUpdater(profile)
case "Postgres" => new PostgresSequenceNextValUpdater(profile)
}
Hence the config needs to have the schemaType
with a default value. This can of course be changed by changing the config when required (e.g. to run tests).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The user should only choose database at one place, which is the Slick profile: akka-persistence-jdbc.shared-databases.slick.profile
. We should be able to look at the profile choice and pick the right SequenceNextValUpdater implementation from that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@patriknw Now fixed. schemaType
removed from reference.conf
.mapConcat(identity) | ||
} | ||
|
||
private[jdbc] def maxStateStoreSequence(): Future[Long] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to maxStateStoreOffset ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done ..
private[jdbc] def selectFromDbByPersistenceId(persistenceId: Rep[String]) = | ||
durableStateTable.filter(_.persistenceId === persistenceId) | ||
|
||
private[jdbc] def insertDbWithDurableState(row: DurableStateTables.DurableStateRow, seqName: String) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename s/seqName/seqNextValue/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done ..
implicit val ec: ExecutionContext = system.dispatcher | ||
implicit val mat: Materializer = SystemMaterializer(system).materializer | ||
|
||
override def scaladslDurableStateStore(): DurableStateStore[Any] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val
instead of def
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done ..
override def scaladslDurableStateStore(): DurableStateStore[Any] = | ||
new scaladsl.JdbcDurableStateStore[Any](db, profile, durableStateConfig, serialization) | ||
|
||
override def javadslDurableStateStore(): JDurableStateStore[AnyRef] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@debasishg, I left a few more comments and suggestions.
Thanks for all that work and patience. 😄
refreshInterval = "1s" | ||
# can be H2, POSTGRES | ||
# schemaType = H2 | ||
schemaType = Postgres |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that so that we will only support it for Postgres and H2? Meaning, only Postgres (for prod)?
should it have a default value? does the journal have a default value for this?
I don't think so. Specially if we don't want/plan to support other DBs.
/* | ||
* Copyright (C) 2014 - 2019 Dennis Vriend <https://github.com/dnvriend> | ||
* Copyright (C) 2019 - 2021 Lightbend Inc. <https://www.lightbend.com> | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
something went wrong here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed ..
def fromRow(serialization: Serialization)(row: DurableStateTables.DurableStateRow): Try[AnyRef] = { | ||
serialization.deserialize(row.statePayload, row.stateSerId, row.stateSerManifest.getOrElse("")) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe better to have only one AkkaSerialization
object. And we can have two fromRow
methods. Either removing the currying or having explicit named for the fromRow
method.
This is internal API, so we don't need to be cautious when making bin-incompatible changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored to have 1 AkkaSerialization
object
- Moved
AkkaSerialization
toakka.persistence.jdbc
. It was inakka.persistence.jdbc.journal.dao
. Didn't want to havestate
depend onjournal
. Hence the move - Added a method
fromDurableStateRow
to the objectAkkaSerialization
to be used fromstate
. Changed the name to avoid any ambiguity. - Removed the other
AkkaSerialization
object
lazy val sequenceNextValUpdater = durableStateTableCfg.schemaType match { | ||
case "H2" => new H2SequenceNextValUpdater(profile) | ||
case "Postgres" => new PostgresSequenceNextValUpdater(profile) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we planning to add the others or have we discovered that the trick we do with the seqNum is not possible with the other DBs?
Otherwise, we can add a catch-all and throw an informative exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will add other dbs as well. I am sure the scheme will work for other dbs. But since we decided to test for H2 and Postgres for the time being, I kept it like that. Added a case_ => ???
that will throw for now.
11b6af7
to
3bf7b7b
Compare
Closing this PR after squashing all commits. This is going to be the feature branch for durable state implementation. |
References akka/akka#30277
Still a WIP but opening a PR for early review feedbacks. Contains the following :
DurableStateStore
(javadsl and scaladsl)DurableStateStoreQuery
(javadsl and scaladsl)currentChanges
andchanges
APIs against H2 and Postgres databaseDurableStateSequenceActor
implemented for monitoring gaps in offsetsTo run tests: