Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/roothash: Parallelize history reindexes #6031

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

martintomazic
Copy link
Contributor

@martintomazic martintomazic commented Feb 3, 2025

Part of #5738

What

  1. Run multiple history reindexes in parallel:
    • Bottleneck will always be iteration over all consensus blocks...
    • However in case of n runtimes we already gain almost n speed-up, since reindex was sequentiall for each runtime until now.
  2. One additional related thing that I remember from the past is that we currently block starting any runtimes until all have completed the history reindex. This could likely be improved as well. But probably belongs in a separate issue.

    • This was the case if you added new runtime, so you needed to reindex it. If the first received reindex command was for this new runtime, you were blocking ready status of all other runtimes, that might be only few heights behind... This is also fixed here. This was the case the moment you started reindexing new runtime, since you were also blocking receiving events for other runtimes, even if their history has been reindexed prior to the new runtime history reindex started -> will fix the changelog.

How to test/benchmark

I am running local testnet with sapphire, emerald and cipher set. I start with synced consensus. I deleted the data/runtimes datadir, to trigger history reindex from the start. Before this change it took around 6h (2h per runtime) for it to finish. Now it happens in ~2h20min (in parallel).

TODO

Benchmark history reindex whilst doing consensus sync.

Copy link

netlify bot commented Feb 3, 2025

Deploy Preview for oasisprotocol-oasis-core canceled.

Name Link
🔨 Latest commit e27710d
🔍 Latest deploy log https://app.netlify.com/sites/oasisprotocol-oasis-core/deploys/67a213e5735e470008182eb8

Comment on lines +597 to +588
// Request subscription to events for this runtime.
// This has to be done only after history reindex is done, so that we don't
// receive events we are currently reindexing.
// Finally, this has to be done always, so that tracking request is not lost.
defer func() {
sc.queryCh <- app.QueryForRuntime(tr.runtimeID)
}()
Copy link
Contributor Author

@martintomazic martintomazic Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How it was done previously and is done now is still not optimal:

  1. Reindex happens once more since we set tr.reindexDone = false few lines below, triggering another reindex after selecting from this query in the services.go for the first time.
    • Depending how long the first second reindex took, we may receive 5-10k few finalized events in between, that we simply ignore later on, when delivered by DeliverEvent, since they were already reindexed (see) (update: strangely unable to confirm this with logs, will dig more)
      • A good solution was proposed to recursively call reindex until predefined number of heights away from the consensus height?
  2. If we fail to reindex due to errors for the second time, we are effectively skipping the reindex?
    • For every error during reindex or processing finalized event we simply set reindexDone=false (see)

Propose to tackle in the follow-up/create an issue for both.

Update:

How it was done previously and is done now is still not optimal:

I don't think this is a problem actually. :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has to be done only after history reindex is done, so that we don't receive events we are currently reindexing.

Not sure if this is fine. If the go scheduler pauses before the query is registered, we could miss few events. If we miss blocks also, this is not fine.

A good solution was proposed to recursively call reindex until predefined number of heights away from the consensus height?

Doesn't seem like a question.

If we fail to reindex due to errors for the second time, we are effectively skipping the reindex?

Maybe, didn't check the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Is done here in ProcessFinalizedEvent defer statement, modifying tracked runtime pointer.

As you noticed I forget to lock trackedRuntimes variable. Handling this is actually quite cumbersome since we have proccessFinalizedEvent (requires locking), triggering reindexBlocks, which calls back proccessFinalizedEvent (requires locking) for every height. Finally we return to original proccessFinalizedEvent that triggered history reindex, which in the defer statement modifies pointer to tracked runtime it started with (its bool flag) (requires locking) :/. Wouldn't be surprised if we could deadlock actually.

Two lessons here:

  1. It is almost certain we need to refactor existing history reindex first.
  2. I started with this PR as it looked like a very simple way of getting solid performance gains (if configured with multiple runtimes) + fixing the 2. issue in the context description. I don't think this is the case anymore.

Now that we know there is no such trivial fix (unless complicating our code further), that would give us significant performance gains (even though far from optimal), we should probably focus on the harder task directly, i.e. reindexing blocks in parallel.

In fact we could go back to original idea, where we change TrackRuntime to TrackRuntimes so that we can trigger block history reindex for all runtimes at the same time, but in parallel for different consensus height intervals? If we go backwards with reindex this could even solve 2.. Alternatively, doing history reindex for every runtime in parallel, for all consensus blocks in parallel seems wasteful...?

cc @kostko

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handling this is actually quite cumbersome

Yes, because you are doing the opposite thing what the previous authors did. The service is parsing commands sequentially, so that you don't need to worry about this stuff.

looked like a very simple way of getting solid performance gains

It didn’t look that way to me. I already mentioned that runtime optimization isn’t that relevant. What matters is optimizing the processing of consensus blocks. To achieve this, you need to introduce concurrency, which makes the task more complex rather than just a few lines of code.

focus on the harder task directly, i.e. reindexing blocks in parallel

Didn't I already said, that your solution probably cannot be upgraded to reindexing blocks in parallel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a first step I recommend decoupling processFinalizedEvent from reindexBlocks so that DeliverEvent and DeliverCommand call their own dedicated function directly.

Personally, I like approach suggested by @peternose, so that we reindex blocks until reasonably close to latest round. It is at the last round of reindex that we should notify subscribers that reindex has finished. This is important as there is no guarantee we receive further blocks for the given runtime (e.g. suspended). Previously this was done via interleaving of methods above and subtle boolean flags which I suggest to refactor/remove.

Likely this will also solve the minor issue outlined in the following thread.

I will work on that tmr and hopefully could be done in 1-2 days. With this out of the way we should probably focus on parallelizing consensus height iteration, in theory this PR also becomes trivial again. :)

Copy link
Contributor Author

@martintomazic martintomazic Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is fine. If the go scheduler pauses before the query is registered, we could miss few events. If we miss blocks also, this is not fine.

You are right since we call sc.GetRuntimeState(ctx, &api.RuntimeRequest) right after this is registered, meaning what I did in this PR was probably off for another reason.

Update: Actually, this part is fine and is how things work now. Since reindexDone=false, you will trigger reindex again until latest height. This only prevents processing/receiving events from the comeBFT roothash during first history reindex (since all code is sequential so you still haven't subscribed to cometBFT roothash events here). If however your finalized events happen during second reindex, you are already listening for them so they are being queued. You will ignore them here. Again this is existing functionality.

@martintomazic martintomazic force-pushed the martin/feature/speed-up-history-reindex branch from 50e7fc4 to e121476 Compare February 4, 2025 11:41
@martintomazic martintomazic changed the title go/roothash: Parallelize history reindex (POC) go/roothash: Parallelize history reindexes Feb 4, 2025
Comment on lines +540 to +542
lastRound = ev.Round
}
return lastRound, nil
Copy link
Contributor Author

@martintomazic martintomazic Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this just to be safe/100% consistent with previous logic. Can we have multiple finalized events for one runtime at one height? We may simplify if this is not the case, but also don't want to make assumptions here if we change later on...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not consistent, as already pointed out.

We trigger history reindex in parallel. Previously, `DeliverCommand`
was blocking, so this also solves a problem, when e.g. you add a new
runtime, and this runtime is first chosen for history reindex. Even
though, other runtimes were only a few rounds from latest height,
they would still have to wait for the history reindex of new runtime.
@martintomazic martintomazic force-pushed the martin/feature/speed-up-history-reindex branch from e121476 to 61c156c Compare February 4, 2025 12:18
@martintomazic martintomazic marked this pull request as ready for review February 4, 2025 12:31
Copy link
Contributor

@peternose peternose left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this code the final solution for this PR or intermediate hack to show how fast this works?

I would try to parallelize blocks and runtimes together, because I'm not sure if this solution can be upgraded to also parallelize blocks.

return 0, fmt.Errorf("failed to process finalized event: %w", err)
}
lastRound = ev.Round
return 0, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this method is special and doesn't return zero value on error. If you want to change this, check all callers and fix all return values.

@@ -437,74 +437,13 @@ func (sc *serviceClient) reindexBlocks(currentHeight int64, bh api.BlockHistory)
)

for height := lastHeight; height <= currentHeight; height++ {
var results *cmtrpctypes.ResultBlockResults
results, err = sc.backend.GetBlockResults(sc.ctx, height)
// XXX: could soft-fail first few heights in case more heights were
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not move this comment.

}
lastRound = round
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the same as before, because if there are no events, this is set to 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I think is best to factor this function back, especially since I don't have to do select over context... Thanks for spotting!

func (sc *serviceClient) handleTrackRuntime(ctx context.Context, height int64, runtimeID common.Namespace, history api.BlockHistory) {
// Request to track a new runtime.
etr := sc.trackedRuntime[runtimeID]
if etr != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if etr != nil {
if tr, ok := sc.trackedRuntime[runtimeID]; ok {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing code, will fix.

select {
case <-sc.ctx.Done():
logger.Info("context cancelled")
return lastRound, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we return an error here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say that almost always we need to return an error when ctx is canceled. If we don't, that must be a very special case and there should be an explanation in the comment.

logger.Info("context cancelled")
return lastRound, nil
default:
// XXX: could soft-fail first few heights in case more heights were
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy path should not be indented.

if err != nil {
return 0, err
select {
case <-sc.ctx.Done():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't make any sense. The long-running functions in reindexBlock already use sc.ctx and will abort if canceled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right since we if reindexBlock returns error we propagate it and exit. For some reason I had in mind we are only logging here (not returning) so I wanted to stop iteration here. Will fix.

@@ -545,7 +551,7 @@ func (sc *serviceClient) ServiceDescriptor() tmapi.ServiceDescriptor {
func (sc *serviceClient) DeliverCommand(ctx context.Context, height int64, cmd interface{}) error {
switch c := cmd.(type) {
case *cmdTrackRuntime:
sc.handleTrackRuntime(ctx, height, c.runtimeID, c.blockHistory)
go sc.handleTrackRuntime(ctx, height, c.runtimeID, c.blockHistory)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already said once, that if you start a go routine, you need to have control over it. Spawning routines all over the place is not the best practice.

Once you allow calling this method concurrently, synchronization becomes a problem. Your solution:

  • won't work if we send two commands requesting to track the same runtime,
  • will sometimes panic when accessing sc.trackedRuntime,
  • ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry completely forgot that I should protect the vars now :/ Will fix :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this solution even acceptable as now runtime tracking is done aysnc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the case before as calling TrackRuntime was not blocking...?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right.

@@ -574,7 +580,12 @@ func (sc *serviceClient) handleTrackRuntime(ctx context.Context, height int64, r
}
sc.trackedRuntime[runtimeID] = tr
// Request subscription to events for this runtime.
sc.queryCh <- app.QueryForRuntime(tr.runtimeID)
// This has to be done only after history reindex is done, so that we don't
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is not clear. This code needs to execute after reindex is done, but still executes if GetRuntimeState fails 🤔

@martintomazic martintomazic marked this pull request as draft February 10, 2025 23:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants