-
Notifications
You must be signed in to change notification settings - Fork 115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
go/roothash: Parallelize history reindexes #6031
base: master
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for oasisprotocol-oasis-core canceled.
|
// Request subscription to events for this runtime. | ||
// This has to be done only after history reindex is done, so that we don't | ||
// receive events we are currently reindexing. | ||
// Finally, this has to be done always, so that tracking request is not lost. | ||
defer func() { | ||
sc.queryCh <- app.QueryForRuntime(tr.runtimeID) | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How it was done previously and is done now is still not optimal:
- Reindex happens once more since we set
tr.reindexDone = false
few lines below, triggering another reindex after selecting from this query in theservices.go
for the first time.- Depending how long the
firstsecond reindex took, we may receivefew finalized events in between, that we simply ignore later on, when delivered by5-10k
DeliverEvent
, since they were already reindexed (see) (update: strangely unable to confirm this with logs, will dig more)- A good solution was proposed to recursively call reindex until predefined number of heights away from the consensus height?
- Depending how long the
If we fail to reindex due to errors for the second time, we are effectively skipping the reindex?- For every error during reindex or processing finalized event we simply set
reindexDone=false
(see)
- For every error during reindex or processing finalized event we simply set
Propose to tackle in the follow-up/create an issue for both.
Update:
How it was done previously and is done now is still not optimal:
I don't think this is a problem actually. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has to be done only after history reindex is done, so that we don't receive events we are currently reindexing.
Not sure if this is fine. If the go scheduler pauses before the query is registered, we could miss few events. If we miss blocks also, this is not fine.
A good solution was proposed to recursively call reindex until predefined number of heights away from the consensus height?
Doesn't seem like a question.
If we fail to reindex due to errors for the second time, we are effectively skipping the reindex?
Maybe, didn't check the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Is done here in
ProcessFinalizedEvent
defer statement, modifying tracked runtime pointer.
As you noticed I forget to lock trackedRuntimes
variable. Handling this is actually quite cumbersome since we have proccessFinalizedEvent
(requires locking), triggering reindexBlocks
, which calls back proccessFinalizedEvent
(requires locking) for every height. Finally we return to original proccessFinalizedEvent
that triggered history reindex, which in the defer statement modifies pointer to tracked runtime it started with (its bool flag) (requires locking) :/. Wouldn't be surprised if we could deadlock actually.
Two lessons here:
- It is almost certain we need to refactor existing history reindex first.
- I started with this PR as it looked like a very simple way of getting solid performance gains (if configured with multiple runtimes) + fixing the 2. issue in the context description. I don't think this is the case anymore.
Now that we know there is no such trivial fix (unless complicating our code further), that would give us significant performance gains (even though far from optimal), we should probably focus on the harder task directly, i.e. reindexing blocks in parallel.
In fact we could go back to original idea, where we change TrackRuntime
to TrackRuntimes
so that we can trigger block history reindex for all runtimes at the same time, but in parallel for different consensus height intervals? If we go backwards with reindex this could even solve 2.. Alternatively, doing history reindex for every runtime in parallel, for all consensus blocks in parallel seems wasteful...?
cc @kostko
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handling this is actually quite cumbersome
Yes, because you are doing the opposite thing what the previous authors did. The service is parsing commands sequentially, so that you don't need to worry about this stuff.
looked like a very simple way of getting solid performance gains
It didn’t look that way to me. I already mentioned that runtime optimization isn’t that relevant. What matters is optimizing the processing of consensus blocks. To achieve this, you need to introduce concurrency, which makes the task more complex rather than just a few lines of code.
focus on the harder task directly, i.e. reindexing blocks in parallel
Didn't I already said, that your solution probably cannot be upgraded to reindexing blocks in parallel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a first step I recommend decoupling processFinalizedEvent
from reindexBlocks
so that DeliverEvent
and DeliverCommand
call their own dedicated function directly.
Personally, I like approach suggested by @peternose, so that we reindex blocks until reasonably close to latest round. It is at the last round of reindex that we should notify subscribers that reindex has finished. This is important as there is no guarantee we receive further blocks for the given runtime (e.g. suspended). Previously this was done via interleaving of methods above and subtle boolean flags which I suggest to refactor/remove.
Likely this will also solve the minor issue outlined in the following thread.
I will work on that tmr and hopefully could be done in 1-2 days. With this out of the way we should probably focus on parallelizing consensus height iteration, in theory this PR also becomes trivial again. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is fine. If the go scheduler pauses before the query is registered, we could miss few events. If we miss blocks also, this is not fine.
You are right since we call sc.GetRuntimeState(ctx, &api.RuntimeRequest)
right after this is registered, meaning what I did in this PR was probably off for another reason.
Update: Actually, this part is fine and is how things work now. Since reindexDone=false
, you will trigger reindex again until latest height. This only prevents processing/receiving events from the comeBFT roothash during first history reindex (since all code is sequential so you still haven't subscribed to cometBFT roothash events here). If however your finalized events happen during second reindex, you are already listening for them so they are being queued. You will ignore them here. Again this is existing functionality.
50e7fc4
to
e121476
Compare
lastRound = ev.Round | ||
} | ||
return lastRound, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this just to be safe/100% consistent with previous logic. Can we have multiple finalized events for one runtime at one height? We may simplify if this is not the case, but also don't want to make assumptions here if we change later on...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not consistent, as already pointed out.
We trigger history reindex in parallel. Previously, `DeliverCommand` was blocking, so this also solves a problem, when e.g. you add a new runtime, and this runtime is first chosen for history reindex. Even though, other runtimes were only a few rounds from latest height, they would still have to wait for the history reindex of new runtime.
e121476
to
61c156c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this code the final solution for this PR or intermediate hack to show how fast this works?
I would try to parallelize blocks and runtimes together, because I'm not sure if this solution can be upgraded to also parallelize blocks.
return 0, fmt.Errorf("failed to process finalized event: %w", err) | ||
} | ||
lastRound = ev.Round | ||
return 0, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this method is special and doesn't return zero value on error. If you want to change this, check all callers and fix all return values.
@@ -437,74 +437,13 @@ func (sc *serviceClient) reindexBlocks(currentHeight int64, bh api.BlockHistory) | |||
) | |||
|
|||
for height := lastHeight; height <= currentHeight; height++ { | |||
var results *cmtrpctypes.ResultBlockResults | |||
results, err = sc.backend.GetBlockResults(sc.ctx, height) | |||
// XXX: could soft-fail first few heights in case more heights were |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not move this comment.
} | ||
lastRound = round |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the same as before, because if there are no events, this is set to 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, I think is best to factor this function back, especially since I don't have to do select over context... Thanks for spotting!
func (sc *serviceClient) handleTrackRuntime(ctx context.Context, height int64, runtimeID common.Namespace, history api.BlockHistory) { | ||
// Request to track a new runtime. | ||
etr := sc.trackedRuntime[runtimeID] | ||
if etr != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if etr != nil { | |
if tr, ok := sc.trackedRuntime[runtimeID]; ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Existing code, will fix.
select { | ||
case <-sc.ctx.Done(): | ||
logger.Info("context cancelled") | ||
return lastRound, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we return an error here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say that almost always we need to return an error when ctx is canceled. If we don't, that must be a very special case and there should be an explanation in the comment.
logger.Info("context cancelled") | ||
return lastRound, nil | ||
default: | ||
// XXX: could soft-fail first few heights in case more heights were |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy path should not be indented.
if err != nil { | ||
return 0, err | ||
select { | ||
case <-sc.ctx.Done(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't make any sense. The long-running functions in reindexBlock
already use sc.ctx
and will abort if canceled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right since we if reindexBlock
returns error we propagate it and exit. For some reason I had in mind we are only logging here (not returning) so I wanted to stop iteration here. Will fix.
@@ -545,7 +551,7 @@ func (sc *serviceClient) ServiceDescriptor() tmapi.ServiceDescriptor { | |||
func (sc *serviceClient) DeliverCommand(ctx context.Context, height int64, cmd interface{}) error { | |||
switch c := cmd.(type) { | |||
case *cmdTrackRuntime: | |||
sc.handleTrackRuntime(ctx, height, c.runtimeID, c.blockHistory) | |||
go sc.handleTrackRuntime(ctx, height, c.runtimeID, c.blockHistory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already said once, that if you start a go routine, you need to have control over it. Spawning routines all over the place is not the best practice.
Once you allow calling this method concurrently, synchronization becomes a problem. Your solution:
- won't work if we send two commands requesting to track the same runtime,
- will sometimes panic when accessing
sc.trackedRuntime
, - ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry completely forgot that I should protect the vars now :/ Will fix :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this solution even acceptable as now runtime tracking is done aysnc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was the case before as calling TrackRuntime
was not blocking...?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right.
@@ -574,7 +580,12 @@ func (sc *serviceClient) handleTrackRuntime(ctx context.Context, height int64, r | |||
} | |||
sc.trackedRuntime[runtimeID] = tr | |||
// Request subscription to events for this runtime. | |||
sc.queryCh <- app.QueryForRuntime(tr.runtimeID) | |||
// This has to be done only after history reindex is done, so that we don't |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is not clear. This code needs to execute after reindex is done, but still executes if GetRuntimeState
fails 🤔
Part of #5738
What
n
runtimes we already gain almostn
speed-up, since reindex was sequentiall for each runtime until now.If the first received reindex command was for this new runtime, you were blocking ready status of all other runtimes, that might be only few heights behind... This is also fixed here. This was the case the moment you started reindexing new runtime, since you were also blocking receiving events for other runtimes, even if their history has been reindexed prior to the new runtime history reindex started -> will fix the changelog.How to test/benchmark
I am running local testnet with
sapphire
,emerald
andcipher
set. I start with synced consensus. I deleted thedata/runtimes
datadir, to trigger history reindex from the start. Before this change it took around 6h (2h per runtime) for it to finish. Now it happens in ~2h20min (in parallel).TODO
Benchmark history reindex whilst doing consensus sync.