Skip to content

Commit

Permalink
services/header: Refactor HeaderService to be responsible for broad…
Browse files Browse the repository at this point in the history
…casting new `ExtendedHeader`s to the gossipsub network (celestiaorg#327)

* core listener

* set ctx and cancelfunc to nil after stopped service

* changelog

* changelog

remove todod

* feat(core): add GetBlockInfo to Fetcher (#3)

feat(service/header): make a helper to assemble ExtendedHeaders

refactor(service/header): make core exchange to rely on new helpers

refactor(service/header): make core listener to rely on new helpers

refactor(node/core): rework construction of core exchange

refactor(service): remove erasure as redudandant

chore: lint

* chore: lint

* Broadcaster code review + an alternative  (#4)

* fix(core): wrap errors instead of logging them

* fix(service/header): fix lifecylces issues and make listener more minimal

* fix(service/header): core listener should stop if at least one error happens

* log(service/header): make annoying 'validation error' a warning

* use commit height in GetBlockInfo

Co-authored-by: Hlib Kanunnikov <hlibwondertan@gmail.com>
  • Loading branch information
2 people authored and Bidon15 committed Jan 11, 2022
1 parent 9ca1789 commit da19647
Show file tree
Hide file tree
Showing 16 changed files with 318 additions and 140 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG-PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ Month, DD, YYYY

### IMPROVEMENTS

- [services/header: Refactor `HeaderService` to be responsible for broadcasting new `ExtendedHeader`s to the gossipsub network](https://github.com/celestiaorg/celestia-node/pull/327) [@renaynay](https://github.com/renaynay)
- [cmd: introduce Env - an Environment for CLI commands #313](https://github.com/celestiaorg/celestia-node/pull/313) [@Wondertan](https://github.com/Wondertan)
- [chore: bump deps #297](https://github.com/celestiaorg/celestia-node/pull/297) [@Wondertan](https://github.com/Wondertan)
- [workflows/lint: update golangci-lint to v1.43 #308](https://github.com/celestiaorg/celestia-node/pull/308) [@Wondertan](https://github.com/Wondertan)
- [feat(node): extract overrides from Config into Settings #292](https://github.com/celestiaorg/celestia-node/pull/292) [@Wondertan](https://github.com/Wondertan)
- [node: Adding WithHost options to settings section #301](https://github.com/celestiaorg/celestia-node/pull/301) [@Bidon15](https://github.com/Bidon15)
- [node: Adding WithCoreClient option #305](https://github.com/celestiaorg/celestia-node/pull/305) [@Bidon15](https://github.com/Bidon15)
- [refactor(services/header): Refactor `HeaderService` to only manage its sub-services' lifecycles #317](https://github.com/celestiaorg/celestia-node/pull/317) [@renaynay](https://github.com/renaynay)
- [service/header: Refactor `HeaderService` to only manage its sub-services' lifecycles #317](https://github.com/celestiaorg/celestia-node/pull/317) [@renaynay](https://github.com/renaynay)
- [docker] Created `docker/` dir with `Dockerfile` and `entrypoint.sh` script.
- [chore(share): handle rows concurrently in GetSharesByNamespace #241](https://github.com/celestiaorg/celestia-node/pull/241) [@vgonkivs](https://github.com/vgonkivs)

Expand Down
20 changes: 20 additions & 0 deletions core/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,26 @@ func NewBlockFetcher(client Client) *BlockFetcher {
}
}

// GetBlockInfo queries Core for additional block information, like Commit and ValidatorSet.
func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height *int64) (*types.Commit, *types.ValidatorSet, error) {
commit, err := f.Commit(ctx, height)
if err != nil {
return nil, nil, fmt.Errorf("core/fetcher: getting commit: %w", err)
}

// If a nil `height` is given as a parameter, there is a chance
// that a new block could be produced between getting the latest
// commit and getting the latest validator set. Therefore, it is
// best to get the validator set at the latest commit's height to
// prevent this potential inconsistency.
valSet, err := f.ValidatorSet(ctx, &commit.Height)
if err != nil {
return nil, nil, fmt.Errorf("core/fetcher: getting validator set: %w", err)
}

return commit, valSet, nil
}

// GetBlock queries Core for a `Block` at the given height.
func (f *BlockFetcher) GetBlock(ctx context.Context, height *int64) (*types.Block, error) {
res, err := f.client.Block(ctx, height)
Expand Down
9 changes: 9 additions & 0 deletions core/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ func StartMockNode() *node.Node {
return rpctest.StartTendermint(app, rpctest.SuppressStdout, rpctest.RecreateConfig)
}

func EphemeralMockEmbeddedClient(t *testing.T) Client {
nd := StartMockNode()
t.Cleanup(func() {
nd.Stop() //nolint:errcheck
rpctest.StopTendermint(nd)
})
return NewEmbeddedFromNode(nd)
}

// MockEmbeddedClient returns a started mock Core Client.
func MockEmbeddedClient() Client {
return NewEmbeddedFromNode(StartMockNode())
Expand Down
4 changes: 4 additions & 0 deletions node/bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestNewBridge(t *testing.T) {
require.NotNil(t, node)
require.NotNil(t, node.Config)
require.NotNil(t, node.Host)
require.NotNil(t, node.HeaderServ)
require.Nil(t, node.BlockServ)
assert.NotZero(t, node.Type)
}
Expand Down Expand Up @@ -104,6 +105,9 @@ func TestBridge_WithRemoteCore(t *testing.T) {

store := MockStore(t, DefaultConfig(Bridge))
remoteCore, protocol, ip := core.StartRemoteCore()
t.Cleanup(func() {
remoteCore.Stop() // nolint:errcheck
})
require.NotNil(t, remoteCore)
assert.True(t, remoteCore.IsRunning())

Expand Down
2 changes: 1 addition & 1 deletion node/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func lightComponents(cfg *Config, store Store) fxutil.Option {
)
}

// fullComponents keeps all the components as DI options required to built a Full Node.
// fullComponents keeps all the components as DI options required to build a Full Node.
func bridgeComponents(cfg *Config, store Store) fxutil.Option {
return fxutil.Options(
fxutil.Supply(Bridge),
Expand Down
57 changes: 23 additions & 34 deletions node/core/core.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package core

import (
"context"

format "github.com/ipfs/go-ipld-format"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/core"
Expand Down Expand Up @@ -31,24 +30,14 @@ func Components(cfg Config, loader core.RepoLoader) fxutil.Option {
return fxutil.Options(
fxutil.Provide(core.NewBlockFetcher),
fxutil.ProvideAs(header.NewCoreExchange, new(header.Exchange)),
fxutil.ProvideIf(cfg.Remote, func(lc fx.Lifecycle) (core.Client, error) {
client, err := RemoteClient(cfg)
if err != nil {
return nil, err
}

lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
return client.Start()
},
OnStop: func(_ context.Context) error {
return client.Stop()
},
})

return client, nil
fxutil.Provide(HeaderCoreListener),
fxutil.ProvideIf(cfg.Remote, func() (core.Client, error) {
return RemoteClient(cfg)
}),
fxutil.InvokeIf(cfg.Remote, func(c core.Client) error {
return c.Start()
}),
fxutil.ProvideIf(!cfg.Remote, func(lc fx.Lifecycle) (core.Client, error) {
fxutil.ProvideIf(!cfg.Remote, func() (core.Client, error) {
store, err := loader()
if err != nil {
return nil, err
Expand All @@ -59,25 +48,25 @@ func Components(cfg Config, loader core.RepoLoader) fxutil.Option {
return nil, err
}

client, err := core.NewEmbedded(cfg)
if err != nil {
return nil, err
}

lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
return client.Start()
},
OnStop: func(_ context.Context) error {
return client.Stop()
},
})

return client, nil
return core.NewEmbedded(cfg)
}),
)
}

func HeaderCoreListener(
lc fx.Lifecycle,
ex *core.BlockFetcher,
p2pSub *header.P2PSubscriber,
dag format.DAGService,
) *header.CoreListener {
cl := header.NewCoreListener(p2pSub, ex, dag)
lc.Append(fx.Hook{
OnStart: cl.Start,
OnStop: cl.Stop,
})
return cl
}

// RemoteClient provides a constructor for core.Client over RPC.
func RemoteClient(cfg Config) (core.Client, error) {
return core.NewRemote(cfg.RemoteConfig.Protocol, cfg.RemoteConfig.RemoteAddr)
Expand Down
6 changes: 4 additions & 2 deletions service/block/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/ipld"

"github.com/tendermint/tendermint/pkg/da"

"github.com/celestiaorg/celestia-node/core"
"github.com/celestiaorg/celestia-node/service"
"github.com/celestiaorg/celestia-node/service/header"
)

Expand Down Expand Up @@ -47,7 +48,8 @@ func generateRawAndExtendedBlock(t *testing.T, store format.DAGService) *Block {
rawBlock, err := fetcher.GetBlock(context.Background(), nil)
require.NoError(t, err)
// extend block
extended, err := service.ExtendBlock(rawBlock, store)
shares, _ := rawBlock.ComputeShares()
extended, err := ipld.PutData(context.Background(), shares.RawShares(), store)
require.NoError(t, err)
// generate dah
dah := da.NewDataAvailabilityHeader(extended)
Expand Down
38 changes: 0 additions & 38 deletions service/erasure.go

This file was deleted.

71 changes: 17 additions & 54 deletions service/header/core_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,8 @@ import (
format "github.com/ipfs/go-ipld-format"

tmbytes "github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/pkg/da"
"github.com/tendermint/tendermint/types"

"github.com/celestiaorg/celestia-node/core"
"github.com/celestiaorg/celestia-node/service"
"github.com/celestiaorg/rsmt2d"
)

type CoreExchange struct {
Expand All @@ -31,11 +27,7 @@ func NewCoreExchange(fetcher *core.BlockFetcher, dag format.DAGService) *CoreExc
func (ce *CoreExchange) RequestHeader(ctx context.Context, height uint64) (*ExtendedHeader, error) {
log.Debugw("core: requesting header", "height", height)
intHeight := int64(height)
block, err := ce.fetcher.GetBlock(ctx, &intHeight)
if err != nil {
return nil, err
}
return ce.generateExtendedHeaderFromBlock(block)
return ce.getExtendedHeaderByHeight(ctx, &intHeight)
}

func (ce *CoreExchange) RequestHeaders(ctx context.Context, from, amount uint64) ([]*ExtendedHeader, error) {
Expand All @@ -59,69 +51,40 @@ func (ce *CoreExchange) RequestByHash(ctx context.Context, hash tmbytes.HexBytes
if err != nil {
return nil, err
}
extHeader, err := ce.generateExtendedHeaderFromBlock(block)

comm, vals, err := ce.fetcher.GetBlockInfo(ctx, &block.Height)
if err != nil {
return nil, err
}

eh, err := MakeExtendedHeader(ctx, block, comm, vals, ce.shareStore)
if err != nil {
return nil, err
}

// verify hashes match
if !hashMatch(hash, extHeader.Hash().Bytes()) {
return nil, fmt.Errorf("incorrect hash in header: expected %x, got %x", hash, extHeader.Hash().Bytes())
if !bytes.Equal(hash, eh.Hash()) {
return nil, fmt.Errorf("incorrect hash in header: expected %x, got %x", hash, eh.Hash())
}
return extHeader, nil
}

func hashMatch(expected, got []byte) bool {
return bytes.Equal(expected, got)
return eh, nil
}

func (ce *CoreExchange) RequestHead(ctx context.Context) (*ExtendedHeader, error) {
log.Debug("core: requesting head")
chainHead, err := ce.fetcher.GetBlock(ctx, nil)
if err != nil {
return nil, err
}
return ce.generateExtendedHeaderFromBlock(chainHead)
return ce.getExtendedHeaderByHeight(ctx, nil)
}

func (ce *CoreExchange) generateExtendedHeaderFromBlock(block *types.Block) (*ExtendedHeader, error) {
// erasure code the block
extended, err := ce.extendBlockData(block)
if err != nil {
log.Errorw("computing extended data square", "err msg", err, "block height",
block.Height, "block hash", block.Hash().String())
return nil, err
}
// write block data to store
dah := da.NewDataAvailabilityHeader(extended)
log.Debugw("generated DataAvailabilityHeader", "data root", dah.Hash())
// create ExtendedHeader
commit, err := ce.fetcher.Commit(context.Background(), &block.Height)
func (ce *CoreExchange) getExtendedHeaderByHeight(ctx context.Context, height *int64) (*ExtendedHeader, error) {
b, err := ce.fetcher.GetBlock(ctx, height)
if err != nil {
log.Errorw("fetching commit", "err", err, "height", block.Height)
return nil, err
}

valSet, err := ce.fetcher.ValidatorSet(context.Background(), &block.Height)
comm, vals, err := ce.fetcher.GetBlockInfo(ctx, &b.Height)
if err != nil {
log.Errorw("fetching validator set", "err", err, "height", block.Height)
return nil, err
}
extHeader := &ExtendedHeader{
RawHeader: block.Header,
DAH: &dah,
Commit: commit,
ValidatorSet: valSet,
}
// sanity check generated ExtendedHeader
err = extHeader.ValidateBasic()
if err != nil {
return nil, err
}
return extHeader, nil
}

// extendBlockData erasure codes the given raw block's data and returns the
// erasure coded block data upon success.
func (ce *CoreExchange) extendBlockData(raw *types.Block) (*rsmt2d.ExtendedDataSquare, error) {
return service.ExtendBlock(raw, ce.shareStore)
return MakeExtendedHeader(ctx, b, comm, vals, ce.shareStore)
}
9 changes: 5 additions & 4 deletions service/header/core_exchange_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package header

import (
"bytes"
"context"
"testing"

Expand All @@ -12,7 +13,7 @@ import (
)

func TestCoreExchange_RequestHeaders(t *testing.T) {
fetcher := createCoreFetcher()
fetcher := createCoreFetcher(t)
store := mdutils.Mock()

// generate 10 blocks
Expand All @@ -29,11 +30,11 @@ func Test_hashMatch(t *testing.T) {
expected := []byte("AE0F153556A4FA5C0B7C3BFE0BAF0EC780C031933B281A8D759BB34C1DA31C56")
mismatch := []byte("57A0D7FE69FE88B3D277C824B3ACB9B60E5E65837A802485DE5CBB278C43576A")

assert.False(t, hashMatch(expected, mismatch))
assert.False(t, bytes.Equal(expected, mismatch))
}

func createCoreFetcher() *core.BlockFetcher {
mock := core.MockEmbeddedClient()
func createCoreFetcher(t *testing.T) *core.BlockFetcher {
mock := core.EphemeralMockEmbeddedClient(t)
return core.NewBlockFetcher(mock)
}

Expand Down
Loading

0 comments on commit da19647

Please sign in to comment.