Skip to content

Commit

Permalink
Broadcaster code review + an alternative (#4)
Browse files Browse the repository at this point in the history
* fix(core): wrap errors instead of logging them

* fix(service/header): fix lifecylces issues and make listener more minimal

* fix(service/header): core listener should stop if at least one error happens

* log(service/header): make annoying 'validation error' a warning
  • Loading branch information
Wondertan authored Jan 9, 2022
1 parent 4b2e7c6 commit b7f82c2
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 69 deletions.
6 changes: 2 additions & 4 deletions core/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,12 @@ func NewBlockFetcher(client Client) *BlockFetcher {
func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height *int64) (*types.Commit, *types.ValidatorSet, error) {
commit, err := f.Commit(ctx, height)
if err != nil {
log.Errorw("fetching commit", "err", err, "height", height)
return nil, nil, err
return nil, nil, fmt.Errorf("core/fetcher: getting commit: %w", err)
}

valSet, err := f.ValidatorSet(ctx, height)
if err != nil {
log.Errorw("fetching validator set", "err", err, "height", height)
return nil, nil, err
return nil, nil, fmt.Errorf("core/fetcher: getting validator set: %w", err)
}

return commit, valSet, nil
Expand Down
105 changes: 41 additions & 64 deletions service/header/core_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package header

import (
"context"
"errors"
"fmt"

format "github.com/ipfs/go-ipld-format"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/tendermint/tendermint/types"

"github.com/celestiaorg/celestia-node/core"
Expand Down Expand Up @@ -38,93 +40,68 @@ func NewCoreListener(p2pSub *P2PSubscriber, fetcher *core.BlockFetcher, dag form

// Start kicks off the CoreListener listener loop.
func (cl *CoreListener) Start(ctx context.Context) error {
cl.ctx, cl.cancel = context.WithCancel(context.Background())
if cl.blockSub != nil {
return fmt.Errorf("core-listener: already started")
}

sub, err := cl.fetcher.SubscribeNewBlockEvent(ctx)
if err != nil {
return err
}

go cl.listen(ctx)
cl.ctx, cl.cancel = context.WithCancel(context.Background())
cl.blockSub = sub
go cl.listen()
return nil
}

// Stop stops the CoreListener listener loop.
func (cl *CoreListener) Stop(context.Context) error {
func (cl *CoreListener) Stop(ctx context.Context) error {
cl.cancel()
cl.ctx, cl.cancel = nil, nil
return nil
cl.blockSub = nil
return cl.fetcher.UnsubscribeNewBlockEvent(ctx)
}

// listen kicks off a loop, listening for new block events from Core,
// generating ExtendedHeaders and broadcasting them to the header-sub
// gossipsub network.
func (cl *CoreListener) listen(ctx context.Context) {
// start subscription to core node new block events
err := cl.startBlockSubscription(ctx)
if err != nil {
log.Errorw("core-listener: starting subscription to core client", "err", err)
return
}

func (cl *CoreListener) listen() {
defer log.Info("core-listener: listening stopped")
for {
select {
case <-cl.ctx.Done():
if err := cl.cancelBlockSubscription(context.Background()); err != nil {
log.Errorw("core-listener: canceling subscription to core", "err", err)
case b, ok := <-cl.blockSub:
if !ok {
return
}
return
default:
// get next header from core
next, err := cl.nextHeader(cl.ctx)

comm, vals, err := cl.fetcher.GetBlockInfo(cl.ctx, &b.Height)
if err != nil {
log.Errorw("core-listener: getting next header", "err", err)
log.Errorw("core-listener: getting block info", "err", err)
return
}
if next == nil {
log.Debugw("core-listener: no subsequent header, exiting listening service")

eh, err := MakeExtendedHeader(cl.ctx, b, comm, vals, cl.dag)
if err != nil {
log.Errorw("core-listener: making extended header", "err", err)
return
}

// broadcast new ExtendedHeader
err = cl.p2pSub.Broadcast(cl.ctx, next)
err = cl.p2pSub.Broadcast(cl.ctx, eh)
if err != nil {
log.Errorw("core-listener: broadcasting next header", "height", next.Height,
"err", err)
var pserr pubsub.ValidationError
if errors.As(err, &pserr) && pserr.Reason == pubsub.RejectValidationIgnored {
log.Warnw("core-listener: broadcasting next header", "height", eh.Height,
"err", err)
} else {
log.Errorw("core-listener: broadcasting next header", "height", eh.Height,
"err", err)
}

return
}
case <-cl.ctx.Done():
return
}
}
}

// startBlockSubscription starts the CoreListener's subscription to new block events
// from Core.
func (cl *CoreListener) startBlockSubscription(ctx context.Context) error {
if cl.blockSub != nil {
return fmt.Errorf("subscription already started")
}

var err error
cl.blockSub, err = cl.fetcher.SubscribeNewBlockEvent(ctx)

return err
}

// cancelBlockSubscription stops the CoreListener's subscription to new block events
// from Core.
func (cl *CoreListener) cancelBlockSubscription(ctx context.Context) error {
return cl.fetcher.UnsubscribeNewBlockEvent(ctx)
}

// nextHeader returns the next latest header from Core.
func (cl *CoreListener) nextHeader(ctx context.Context) (*ExtendedHeader, error) {
select {
case newBlock, ok := <-cl.blockSub:
if !ok {
return nil, fmt.Errorf("subscription closed")
}

comm, vals, err := cl.fetcher.GetBlockInfo(ctx, &newBlock.Height)
if err != nil {
return nil, err
}

return MakeExtendedHeader(ctx, newBlock, comm, vals, cl.dag)
case <-ctx.Done():
return nil, ctx.Err()
}
}
2 changes: 1 addition & 1 deletion service/header/core_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestCoreListener(t *testing.T) {

err = cl.Stop(ctx)
require.NoError(t, err)
require.Nil(t, cl.ctx)
require.Nil(t, cl.blockSub)
}

func createMocknetWithTwoPubsubEndpoints(t *testing.T) (*pubsub.PubSub, *pubsub.PubSub) {
Expand Down

0 comments on commit b7f82c2

Please sign in to comment.