Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor next slot cache #12233

Merged
merged 6 commits into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions beacon-chain/blockchain/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,9 +671,7 @@ func (s *Service) fillMissingPayloadIDRoutine(ctx context.Context, stateFeed *ev
for {
select {
case <-ticker.C():
if err := s.fillMissingBlockPayloadId(ctx); err != nil {
log.WithError(err).Error("Could not fill missing payload ID")
}
s.lateBlockTasks(ctx)

case <-ctx.Done():
log.Debug("Context closed, exiting routine")
Expand All @@ -683,11 +681,13 @@ func (s *Service) fillMissingPayloadIDRoutine(ctx context.Context, stateFeed *ev
}()
}

// fillMissingBlockPayloadId is called 4 seconds into the slot and calls FCU if we are proposing next slot
// and the cache has been missed
func (s *Service) fillMissingBlockPayloadId(ctx context.Context) error {
// lateBlockTasks is called 4 seconds into the slot and performs tasks
// related to late blocks. It emits a MissedSlot state feed event.
// It calls FCU and sets the right attributes if we are proposing next slot
// it also updates the next slot cache to deal with skipped slots.
func (s *Service) lateBlockTasks(ctx context.Context) {
if s.CurrentSlot() == s.HeadSlot() {
return nil
return
}
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.MissedSlot,
Expand All @@ -697,21 +697,31 @@ func (s *Service) fillMissingBlockPayloadId(ctx context.Context) error {
_, id, has := s.cfg.ProposerSlotIndexCache.GetProposerPayloadIDs(s.CurrentSlot()+1, [32]byte{} /* head root */)
// There exists proposer for next slot, but we haven't called fcu w/ payload attribute yet.
if !has || id != [8]byte{} {
return nil
return
}
s.headLock.RLock()
headBlock, err := s.headBlock()
if err != nil {
s.headLock.RUnlock()
return err
log.WithError(err).Debug("could not perform late block tasks: failed to retrieve head block")
return
}
headState := s.headState(ctx)
headRoot := s.headRoot()
headState := s.headState(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason you switched the ordering from

  • state then root to root and then state ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no reason, was going to use the cached state as head state, but that ended up being too hard to do in one PR,

s.headLock.RUnlock()
_, err = s.notifyForkchoiceUpdate(ctx, &notifyForkchoiceUpdateArg{
headState: headState,
headRoot: headRoot,
headBlock: headBlock.Block(),
})
return err
if err != nil {
log.WithError(err).Debug("could not perform late block tasks: failed to update forkchoice with engine")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if these Debug verbosity error logs should have higher visibility

}
lastRoot, lastState := transition.LastCachedState()
if lastState == nil {
lastRoot, lastState = headRoot[:], headState
}
if err = transition.UpdateNextSlotCache(ctx, lastRoot, lastState); err != nil {
log.WithError(err).Debug("could not update next slot state cache")
}
}
4 changes: 3 additions & 1 deletion beacon-chain/blockchain/process_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2153,6 +2153,7 @@ func TestOnBlock_HandleBlockAttestations(t *testing.T) {
}

func TestFillMissingBlockPayloadId_DiffSlotExitEarly(t *testing.T) {
logHook := logTest.NewGlobal()
fc := doublylinkedtree.New()
ctx := context.Background()
beaconDB := testDB.SetupDB(t)
Expand All @@ -2164,7 +2165,8 @@ func TestFillMissingBlockPayloadId_DiffSlotExitEarly(t *testing.T) {

service, err := NewService(ctx, opts...)
require.NoError(t, err)
require.NoError(t, service.fillMissingBlockPayloadId(ctx), 0)
service.lateBlockTasks(ctx)
require.LogsDoNotContain(t, logHook, "could not perform late block tasks")
}

// Helper function to simulate the block being on time or delayed for proposer
Expand Down
55 changes: 37 additions & 18 deletions beacon-chain/core/transition/trailing_slot_state_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@ import (
"context"
"sync"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
)

type nextSlotCache struct {
sync.RWMutex
root []byte
state state.BeaconState
sync.Mutex
prevRoot []byte
lastRoot []byte
prevState state.BeaconState
lastState state.BeaconState
}

var (
Expand All @@ -29,19 +33,22 @@ var (
})
)

// NextSlotState returns the saved state if the input root matches the root in `nextSlotCache`. Returns nil otherwise.
// This is useful to check before processing slots. With a cache hit, it will return last processed state with slot plus
// one advancement.
func NextSlotState(_ context.Context, root []byte) (state.BeaconState, error) {
nsc.RLock()
defer nsc.RUnlock()
if !bytes.Equal(root, nsc.root) || bytes.Equal(root, []byte{}) {
nextSlotCacheMiss.Inc()
return nil, nil
// NextSlotState returns the saved state for the given blockroot.
// It returns the last updated state if it matches. Otherwise it returns the previously
// updated state if it matches its root. If no root matches it returns nil
func NextSlotState(root []byte) state.BeaconState {
nsc.Lock()
defer nsc.Unlock()
if bytes.Equal(root, nsc.lastRoot) {
nextSlotCacheHit.Inc()
return nsc.lastState.Copy()
}
nextSlotCacheHit.Inc()
// Returning copied state.
return nsc.state.Copy(), nil
if bytes.Equal(root, nsc.prevRoot) {
nextSlotCacheHit.Inc()
return nsc.prevState.Copy()
}
nextSlotCacheMiss.Inc()
return nil
}

// UpdateNextSlotCache updates the `nextSlotCache`. It saves the input state after advancing the state slot by 1
Expand All @@ -52,13 +59,25 @@ func UpdateNextSlotCache(ctx context.Context, root []byte, state state.BeaconSta
copied := state.Copy()
copied, err := ProcessSlots(ctx, copied, copied.Slot()+1)
if err != nil {
return err
return errors.Wrap(err, "could not process slots")
}

nsc.Lock()
defer nsc.Unlock()

nsc.root = root
nsc.state = copied
nsc.prevRoot = nsc.lastRoot
nsc.prevState = nsc.lastState
nsc.lastRoot = bytesutil.SafeCopyBytes(root)
nsc.lastState = copied
return nil
}

// LastCachedState returns the last cached state and root in the cache
func LastCachedState() ([]byte, state.BeaconState) {
nsc.Lock()
defer nsc.Unlock()
if nsc.lastState == nil {
return nil, nil
}
return bytesutil.SafeCopyBytes(nsc.lastRoot), nsc.lastState.Copy()
}
17 changes: 11 additions & 6 deletions beacon-chain/core/transition/trailing_slot_state_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,23 @@ import (
func TestTrailingSlotState_RoundTrip(t *testing.T) {
ctx := context.Background()
r := []byte{'a'}
s, err := transition.NextSlotState(ctx, r)
require.NoError(t, err)
s := transition.NextSlotState(r)
require.Equal(t, nil, s)

s, _ = util.DeterministicGenesisState(t, 1)
require.NoError(t, transition.UpdateNextSlotCache(ctx, r, s))
s, err = transition.NextSlotState(ctx, r)
require.NoError(t, err)
s = transition.NextSlotState(r)
require.Equal(t, primitives.Slot(1), s.Slot())

lastRoot, lastState := transition.LastCachedState()
require.DeepEqual(t, r, lastRoot)
require.Equal(t, s.Slot(), lastState.Slot())

require.NoError(t, transition.UpdateNextSlotCache(ctx, r, s))
s, err = transition.NextSlotState(ctx, r)
require.NoError(t, err)
s = transition.NextSlotState(r)
require.Equal(t, primitives.Slot(2), s.Slot())

lastRoot, lastState = transition.LastCachedState()
require.DeepEqual(t, r, lastRoot)
require.Equal(t, s.Slot(), lastState.Slot())
}
20 changes: 5 additions & 15 deletions beacon-chain/core/transition/transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,25 +147,15 @@ func ProcessSlotsUsingNextSlotCache(
ctx, span := trace.StartSpan(ctx, "core.state.ProcessSlotsUsingNextSlotCache")
defer span.End()

// Check whether the parent state has been advanced by 1 slot in next slot cache.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very simple function that was overly commented making it less readable. I removed some useless variables and most comments.

nextSlotState, err := NextSlotState(ctx, parentRoot)
if err != nil {
return nil, err
}
cachedStateExists := nextSlotState != nil && !nextSlotState.IsNil()
// If the next slot state is not nil (i.e. cache hit).
// We replace next slot state with parent state.
if cachedStateExists {
nextSlotState := NextSlotState(parentRoot)
if nextSlotState != nil {
parentState = nextSlotState
}

// In the event our cached state has advanced our
// state to the desired slot, we exit early.
if cachedStateExists && parentState.Slot() == slot {
if parentState.Slot() == slot {
return parentState, nil
}
// Since next slot cache only advances state by 1 slot,
// we check if there's more slots that need to process.

var err error
parentState, err = ProcessSlots(ctx, parentState, slot)
if err != nil {
return nil, errors.Wrap(err, "could not process slots")
Expand Down