Skip to content

Commit

Permalink
feat(core): add GetBlockInfo to Fetcher (#3)
Browse files Browse the repository at this point in the history
feat(service/header): make a helper to assemble ExtendedHeaders

refactor(service/header): make core exchange to rely on new helpers

refactor(service/header): make core listener to rely on new helpers

refactor(node/core): rework construction of core exchange

refactor(service): remove erasure as redudandant

chore: lint
  • Loading branch information
Wondertan authored Jan 7, 2022
1 parent 76672fb commit 570e8a2
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 118 deletions.
17 changes: 17 additions & 0 deletions core/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,23 @@ func NewBlockFetcher(client Client) *BlockFetcher {
}
}

// GetBlockInfo queries Core for additional block information, like Commit and ValidatorSet.
func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height *int64) (*types.Commit, *types.ValidatorSet, error) {
commit, err := f.Commit(ctx, height)
if err != nil {
log.Errorw("fetching commit", "err", err, "height", height)
return nil, nil, err
}

valSet, err := f.ValidatorSet(ctx, height)
if err != nil {
log.Errorw("fetching validator set", "err", err, "height", height)
return nil, nil, err
}

return commit, valSet, nil
}

// GetBlock queries Core for a `Block` at the given height.
func (f *BlockFetcher) GetBlock(ctx context.Context, height *int64) (*types.Block, error) {
res, err := f.client.Block(ctx, height)
Expand Down
16 changes: 8 additions & 8 deletions node/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func DefaultConfig() Config {
func Components(cfg Config, loader core.RepoLoader) fxutil.Option {
return fxutil.Options(
fxutil.Provide(core.NewBlockFetcher),
fxutil.Provide(HeaderCoreExchange),
fxutil.ProvideAs(header.NewCoreExchange, new(header.Exchange)),
fxutil.Provide(HeaderCoreListener),
fxutil.ProvideIf(cfg.Remote, func() (core.Client, error) {
return RemoteClient(cfg)
Expand All @@ -53,13 +53,13 @@ func Components(cfg Config, loader core.RepoLoader) fxutil.Option {
)
}

func HeaderCoreExchange(fetcher *core.BlockFetcher, dag format.DAGService) (*header.CoreExchange, header.Exchange) {
ce := header.NewCoreExchange(fetcher, dag)
return ce, ce
}

func HeaderCoreListener(lc fx.Lifecycle, ex *header.CoreExchange, p2pSub *header.P2PSubscriber) *header.CoreListener {
cl := header.NewCoreListener(ex, p2pSub)
func HeaderCoreListener(
lc fx.Lifecycle,
ex *core.BlockFetcher,
p2pSub *header.P2PSubscriber,
dag format.DAGService,
) *header.CoreListener {
cl := header.NewCoreListener(p2pSub, ex, dag)
lc.Append(fx.Hook{
OnStart: cl.Start,
OnStop: cl.Stop,
Expand Down
6 changes: 4 additions & 2 deletions service/block/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/ipld"

"github.com/tendermint/tendermint/pkg/da"

"github.com/celestiaorg/celestia-node/core"
"github.com/celestiaorg/celestia-node/service"
"github.com/celestiaorg/celestia-node/service/header"
)

Expand Down Expand Up @@ -47,7 +48,8 @@ func generateRawAndExtendedBlock(t *testing.T, store format.DAGService) *Block {
rawBlock, err := fetcher.GetBlock(context.Background(), nil)
require.NoError(t, err)
// extend block
extended, err := service.ExtendBlock(rawBlock, store)
shares, _ := rawBlock.ComputeShares()
extended, err := ipld.PutData(context.Background(), shares.RawShares(), store)
require.NoError(t, err)
// generate dah
dah := da.NewDataAvailabilityHeader(extended)
Expand Down
38 changes: 0 additions & 38 deletions service/erasure.go

This file was deleted.

71 changes: 17 additions & 54 deletions service/header/core_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,8 @@ import (
format "github.com/ipfs/go-ipld-format"

tmbytes "github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/pkg/da"
"github.com/tendermint/tendermint/types"

"github.com/celestiaorg/celestia-node/core"
"github.com/celestiaorg/celestia-node/service"
"github.com/celestiaorg/rsmt2d"
)

type CoreExchange struct {
Expand All @@ -31,11 +27,7 @@ func NewCoreExchange(fetcher *core.BlockFetcher, dag format.DAGService) *CoreExc
func (ce *CoreExchange) RequestHeader(ctx context.Context, height uint64) (*ExtendedHeader, error) {
log.Debugw("core: requesting header", "height", height)
intHeight := int64(height)
block, err := ce.fetcher.GetBlock(ctx, &intHeight)
if err != nil {
return nil, err
}
return ce.generateExtendedHeaderFromBlock(block)
return ce.getExtendedHeaderByHeight(ctx, &intHeight)
}

func (ce *CoreExchange) RequestHeaders(ctx context.Context, from, amount uint64) ([]*ExtendedHeader, error) {
Expand All @@ -59,69 +51,40 @@ func (ce *CoreExchange) RequestByHash(ctx context.Context, hash tmbytes.HexBytes
if err != nil {
return nil, err
}
extHeader, err := ce.generateExtendedHeaderFromBlock(block)

comm, vals, err := ce.fetcher.GetBlockInfo(ctx, &block.Height)
if err != nil {
return nil, err
}

eh, err := MakeExtendedHeader(ctx, block, comm, vals, ce.shareStore)
if err != nil {
return nil, err
}

// verify hashes match
if !hashMatch(hash, extHeader.Hash().Bytes()) {
return nil, fmt.Errorf("incorrect hash in header: expected %x, got %x", hash, extHeader.Hash().Bytes())
if !bytes.Equal(hash, eh.Hash()) {
return nil, fmt.Errorf("incorrect hash in header: expected %x, got %x", hash, eh.Hash())
}
return extHeader, nil
}

func hashMatch(expected, got []byte) bool {
return bytes.Equal(expected, got)
return eh, nil
}

func (ce *CoreExchange) RequestHead(ctx context.Context) (*ExtendedHeader, error) {
log.Debug("core: requesting head")
chainHead, err := ce.fetcher.GetBlock(ctx, nil)
if err != nil {
return nil, err
}
return ce.generateExtendedHeaderFromBlock(chainHead)
return ce.getExtendedHeaderByHeight(ctx, nil)
}

func (ce *CoreExchange) generateExtendedHeaderFromBlock(block *types.Block) (*ExtendedHeader, error) {
// erasure code the block
extended, err := ce.extendBlockData(block)
if err != nil {
log.Errorw("computing extended data square", "err msg", err, "block height",
block.Height, "block hash", block.Hash().String())
return nil, err
}
// write block data to store
dah := da.NewDataAvailabilityHeader(extended)
log.Debugw("generated DataAvailabilityHeader", "data root", dah.Hash())
// create ExtendedHeader
commit, err := ce.fetcher.Commit(context.Background(), &block.Height)
func (ce *CoreExchange) getExtendedHeaderByHeight(ctx context.Context, height *int64) (*ExtendedHeader, error) {
b, err := ce.fetcher.GetBlock(ctx, height)
if err != nil {
log.Errorw("fetching commit", "err", err, "height", block.Height)
return nil, err
}

valSet, err := ce.fetcher.ValidatorSet(context.Background(), &block.Height)
comm, vals, err := ce.fetcher.GetBlockInfo(ctx, &b.Height)
if err != nil {
log.Errorw("fetching validator set", "err", err, "height", block.Height)
return nil, err
}
extHeader := &ExtendedHeader{
RawHeader: block.Header,
DAH: &dah,
Commit: commit,
ValidatorSet: valSet,
}
// sanity check generated ExtendedHeader
err = extHeader.ValidateBasic()
if err != nil {
return nil, err
}
return extHeader, nil
}

// extendBlockData erasure codes the given raw block's data and returns the
// erasure coded block data upon success.
func (ce *CoreExchange) extendBlockData(raw *types.Block) (*rsmt2d.ExtendedDataSquare, error) {
return service.ExtendBlock(raw, ce.shareStore)
return MakeExtendedHeader(ctx, b, comm, vals, ce.shareStore)
}
3 changes: 2 additions & 1 deletion service/header/core_exchange_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package header

import (
"bytes"
"context"
"testing"

Expand Down Expand Up @@ -29,7 +30,7 @@ func Test_hashMatch(t *testing.T) {
expected := []byte("AE0F153556A4FA5C0B7C3BFE0BAF0EC780C031933B281A8D759BB34C1DA31C56")
mismatch := []byte("57A0D7FE69FE88B3D277C824B3ACB9B60E5E65837A802485DE5CBB278C43576A")

assert.False(t, hashMatch(expected, mismatch))
assert.False(t, bytes.Equal(expected, mismatch))
}

func createCoreFetcher(t *testing.T) *core.BlockFetcher {
Expand Down
31 changes: 21 additions & 10 deletions service/header/core_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"context"
"fmt"

format "github.com/ipfs/go-ipld-format"
"github.com/tendermint/tendermint/types"

"github.com/celestiaorg/celestia-node/core"
)

// CoreListener is responsible for listening to Core for
Expand All @@ -15,19 +18,21 @@ import (
// broadcasts the new `ExtendedHeader` to the header-sub gossipsub
// network.
type CoreListener struct {
ex *CoreExchange
p2pSub *P2PSubscriber
p2pSub *P2PSubscriber
fetcher *core.BlockFetcher
dag format.DAGService

blockSub <-chan *types.Block

ctx context.Context
cancel context.CancelFunc
}

func NewCoreListener(ex *CoreExchange, p2pSub *P2PSubscriber) *CoreListener {
func NewCoreListener(p2pSub *P2PSubscriber, fetcher *core.BlockFetcher, dag format.DAGService) *CoreListener {
return &CoreListener{
ex: ex,
p2pSub: p2pSub,
p2pSub: p2pSub,
fetcher: fetcher,
dag: dag,
}
}

Expand Down Expand Up @@ -94,26 +99,32 @@ func (cl *CoreListener) startBlockSubscription(ctx context.Context) error {
}

var err error
cl.blockSub, err = cl.ex.fetcher.SubscribeNewBlockEvent(ctx)
cl.blockSub, err = cl.fetcher.SubscribeNewBlockEvent(ctx)

return err
}

// cancelBlockSubscription stops the CoreListener's subscription to new block events
// from Core.
func (cl *CoreListener) cancelBlockSubscription(ctx context.Context) error {
return cl.ex.fetcher.UnsubscribeNewBlockEvent(ctx)
return cl.fetcher.UnsubscribeNewBlockEvent(ctx)
}

// nextHeader returns the next latest header from Core.
func (cl *CoreListener) nextHeader(ctx context.Context) (*ExtendedHeader, error) {
select {
case <-ctx.Done():
return nil, nil
case newBlock, ok := <-cl.blockSub:
if !ok {
return nil, fmt.Errorf("subscription closed")
}
return cl.ex.generateExtendedHeaderFromBlock(newBlock)

comm, vals, err := cl.fetcher.GetBlockInfo(ctx, &newBlock.Height)
if err != nil {
return nil, err
}

return MakeExtendedHeader(ctx, newBlock, comm, vals, cl.dag)
case <-ctx.Done():
return nil, ctx.Err()
}
}
5 changes: 1 addition & 4 deletions service/header/core_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,12 @@ func createCoreListener(
fetcher *core.BlockFetcher,
ps *pubsub.PubSub,
) *CoreListener {
// create all sub-services necessary for CoreListener
ex := NewCoreExchange(fetcher, mdutils.Mock())

p2pSub := NewP2PSubscriber(ps, nil)
err := p2pSub.Start(context.Background())
require.NoError(t, err)
t.Cleanup(func() {
p2pSub.Stop(context.Background()) //nolint:errcheck
})

return NewCoreListener(ex, p2pSub)
return NewCoreListener(p2pSub, fetcher, mdutils.Mock())
}
28 changes: 28 additions & 0 deletions service/header/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package header

import (
"bytes"
"context"
"fmt"

format "github.com/ipfs/go-ipld-format"
bts "github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/pkg/da"
core "github.com/tendermint/tendermint/types"

"github.com/celestiaorg/celestia-node/ipld"

header_pb "github.com/celestiaorg/celestia-node/service/header/pb"
)

Expand All @@ -31,6 +35,30 @@ type ExtendedHeader struct {
DAH *DataAvailabilityHeader `json:"dah"`
}

// MakeExtendedHeader assembles new ExtendedHeader.
func MakeExtendedHeader(
ctx context.Context,
b *core.Block,
comm *core.Commit,
vals *core.ValidatorSet,
dag format.NodeAdder,
) (*ExtendedHeader, error) {
namespacedShares, _ := b.Data.ComputeShares()
extended, err := ipld.PutData(ctx, namespacedShares.RawShares(), dag)
if err != nil {
return nil, err
}

dah := da.NewDataAvailabilityHeader(extended)
eh := &ExtendedHeader{
RawHeader: b.Header,
DAH: &dah,
Commit: comm,
ValidatorSet: vals,
}
return eh, eh.ValidateBasic()
}

// Hash returns Hash of the wrapped RawHeader.
// NOTE: It purposely overrides Hash method of RawHeader to get it directly from Commit without recomputing.
func (eh *ExtendedHeader) Hash() bts.HexBytes {
Expand Down
Loading

0 comments on commit 570e8a2

Please sign in to comment.