Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/header: Refactor HeaderService to be responsible for broadcasting new ExtendedHeaders to the gossipsub network #327

Merged
merged 8 commits into from
Jan 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG-PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ Month, DD, YYYY

### IMPROVEMENTS

- [services/header: Refactor `HeaderService` to be responsible for broadcasting new `ExtendedHeader`s to the gossipsub network](https://github.com/celestiaorg/celestia-node/pull/327) [@renaynay](https://github.com/renaynay)
- [cmd: introduce Env - an Environment for CLI commands #313](https://github.com/celestiaorg/celestia-node/pull/313) [@Wondertan](https://github.com/Wondertan)
- [chore: bump deps #297](https://github.com/celestiaorg/celestia-node/pull/297) [@Wondertan](https://github.com/Wondertan)
- [workflows/lint: update golangci-lint to v1.43 #308](https://github.com/celestiaorg/celestia-node/pull/308) [@Wondertan](https://github.com/Wondertan)
- [feat(node): extract overrides from Config into Settings #292](https://github.com/celestiaorg/celestia-node/pull/292) [@Wondertan](https://github.com/Wondertan)
- [node: Adding WithHost options to settings section #301](https://github.com/celestiaorg/celestia-node/pull/301) [@Bidon15](https://github.com/Bidon15)
- [node: Adding WithCoreClient option #305](https://github.com/celestiaorg/celestia-node/pull/305) [@Bidon15](https://github.com/Bidon15)
- [refactor(services/header): Refactor `HeaderService` to only manage its sub-services' lifecycles #317](https://github.com/celestiaorg/celestia-node/pull/317) [@renaynay](https://github.com/renaynay)
- [service/header: Refactor `HeaderService` to only manage its sub-services' lifecycles #317](https://github.com/celestiaorg/celestia-node/pull/317) [@renaynay](https://github.com/renaynay)
- [docker] Created `docker/` dir with `Dockerfile` and `entrypoint.sh` script.
- [chore(share): handle rows concurrently in GetSharesByNamespace #241](https://github.com/celestiaorg/celestia-node/pull/241) [@vgonkivs](https://github.com/vgonkivs)

Expand Down
20 changes: 20 additions & 0 deletions core/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,26 @@ func NewBlockFetcher(client Client) *BlockFetcher {
}
}

// GetBlockInfo queries Core for additional block information, like Commit and ValidatorSet.
func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height *int64) (*types.Commit, *types.ValidatorSet, error) {
commit, err := f.Commit(ctx, height)
if err != nil {
return nil, nil, fmt.Errorf("core/fetcher: getting commit: %w", err)
}

// If a nil `height` is given as a parameter, there is a chance
// that a new block could be produced between getting the latest
// commit and getting the latest validator set. Therefore, it is
// best to get the validator set at the latest commit's height to
// prevent this potential inconsistency.
valSet, err := f.ValidatorSet(ctx, &commit.Height)
if err != nil {
return nil, nil, fmt.Errorf("core/fetcher: getting validator set: %w", err)
}

return commit, valSet, nil
}

// GetBlock queries Core for a `Block` at the given height.
func (f *BlockFetcher) GetBlock(ctx context.Context, height *int64) (*types.Block, error) {
res, err := f.client.Block(ctx, height)
Expand Down
9 changes: 9 additions & 0 deletions core/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ func StartMockNode() *node.Node {
return rpctest.StartTendermint(app, rpctest.SuppressStdout, rpctest.RecreateConfig)
}

func EphemeralMockEmbeddedClient(t *testing.T) Client {
nd := StartMockNode()
t.Cleanup(func() {
nd.Stop() //nolint:errcheck
rpctest.StopTendermint(nd)
})
return NewEmbeddedFromNode(nd)
}

// MockEmbeddedClient returns a started mock Core Client.
func MockEmbeddedClient() Client {
return NewEmbeddedFromNode(StartMockNode())
Expand Down
4 changes: 4 additions & 0 deletions node/bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestNewBridge(t *testing.T) {
require.NotNil(t, node)
require.NotNil(t, node.Config)
require.NotNil(t, node.Host)
require.NotNil(t, node.HeaderServ)
require.Nil(t, node.BlockServ)
assert.NotZero(t, node.Type)
}
Expand Down Expand Up @@ -104,6 +105,9 @@ func TestBridge_WithRemoteCore(t *testing.T) {

store := MockStore(t, DefaultConfig(Bridge))
remoteCore, protocol, ip := core.StartRemoteCore()
t.Cleanup(func() {
remoteCore.Stop() // nolint:errcheck
})
require.NotNil(t, remoteCore)
assert.True(t, remoteCore.IsRunning())

Expand Down
2 changes: 1 addition & 1 deletion node/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func lightComponents(cfg *Config, store Store) fxutil.Option {
)
}

// fullComponents keeps all the components as DI options required to built a Full Node.
// fullComponents keeps all the components as DI options required to build a Full Node.
func bridgeComponents(cfg *Config, store Store) fxutil.Option {
return fxutil.Options(
fxutil.Supply(Bridge),
Expand Down
57 changes: 23 additions & 34 deletions node/core/core.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package core

import (
"context"

format "github.com/ipfs/go-ipld-format"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/core"
Expand Down Expand Up @@ -31,24 +30,14 @@ func Components(cfg Config, loader core.RepoLoader) fxutil.Option {
return fxutil.Options(
fxutil.Provide(core.NewBlockFetcher),
fxutil.ProvideAs(header.NewCoreExchange, new(header.Exchange)),
fxutil.ProvideIf(cfg.Remote, func(lc fx.Lifecycle) (core.Client, error) {
client, err := RemoteClient(cfg)
if err != nil {
return nil, err
}

lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
return client.Start()
},
OnStop: func(_ context.Context) error {
return client.Stop()
},
})

return client, nil
fxutil.Provide(HeaderCoreListener),
fxutil.ProvideIf(cfg.Remote, func() (core.Client, error) {
return RemoteClient(cfg)
}),
fxutil.InvokeIf(cfg.Remote, func(c core.Client) error {
return c.Start()
}),
fxutil.ProvideIf(!cfg.Remote, func(lc fx.Lifecycle) (core.Client, error) {
fxutil.ProvideIf(!cfg.Remote, func() (core.Client, error) {
store, err := loader()
if err != nil {
return nil, err
Expand All @@ -59,25 +48,25 @@ func Components(cfg Config, loader core.RepoLoader) fxutil.Option {
return nil, err
}

client, err := core.NewEmbedded(cfg)
if err != nil {
return nil, err
}

lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
return client.Start()
},
OnStop: func(_ context.Context) error {
return client.Stop()
},
})

return client, nil
return core.NewEmbedded(cfg)
}),
)
}

func HeaderCoreListener(
lc fx.Lifecycle,
ex *core.BlockFetcher,
p2pSub *header.P2PSubscriber,
dag format.DAGService,
) *header.CoreListener {
cl := header.NewCoreListener(p2pSub, ex, dag)
lc.Append(fx.Hook{
OnStart: cl.Start,
OnStop: cl.Stop,
})
return cl
}

// RemoteClient provides a constructor for core.Client over RPC.
func RemoteClient(cfg Config) (core.Client, error) {
return core.NewRemote(cfg.RemoteConfig.Protocol, cfg.RemoteConfig.RemoteAddr)
Expand Down
6 changes: 4 additions & 2 deletions service/block/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/ipld"

"github.com/tendermint/tendermint/pkg/da"

"github.com/celestiaorg/celestia-node/core"
"github.com/celestiaorg/celestia-node/service"
"github.com/celestiaorg/celestia-node/service/header"
)

Expand Down Expand Up @@ -47,7 +48,8 @@ func generateRawAndExtendedBlock(t *testing.T, store format.DAGService) *Block {
rawBlock, err := fetcher.GetBlock(context.Background(), nil)
require.NoError(t, err)
// extend block
extended, err := service.ExtendBlock(rawBlock, store)
shares, _ := rawBlock.ComputeShares()
extended, err := ipld.PutData(context.Background(), shares.RawShares(), store)
require.NoError(t, err)
// generate dah
dah := da.NewDataAvailabilityHeader(extended)
Expand Down
38 changes: 0 additions & 38 deletions service/erasure.go

This file was deleted.

71 changes: 17 additions & 54 deletions service/header/core_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,8 @@ import (
format "github.com/ipfs/go-ipld-format"

tmbytes "github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/pkg/da"
"github.com/tendermint/tendermint/types"

"github.com/celestiaorg/celestia-node/core"
"github.com/celestiaorg/celestia-node/service"
"github.com/celestiaorg/rsmt2d"
)

type CoreExchange struct {
Expand All @@ -31,11 +27,7 @@ func NewCoreExchange(fetcher *core.BlockFetcher, dag format.DAGService) *CoreExc
func (ce *CoreExchange) RequestHeader(ctx context.Context, height uint64) (*ExtendedHeader, error) {
log.Debugw("core: requesting header", "height", height)
intHeight := int64(height)
block, err := ce.fetcher.GetBlock(ctx, &intHeight)
if err != nil {
return nil, err
}
return ce.generateExtendedHeaderFromBlock(block)
return ce.getExtendedHeaderByHeight(ctx, &intHeight)
}

func (ce *CoreExchange) RequestHeaders(ctx context.Context, from, amount uint64) ([]*ExtendedHeader, error) {
Expand All @@ -59,69 +51,40 @@ func (ce *CoreExchange) RequestByHash(ctx context.Context, hash tmbytes.HexBytes
if err != nil {
return nil, err
}
extHeader, err := ce.generateExtendedHeaderFromBlock(block)

comm, vals, err := ce.fetcher.GetBlockInfo(ctx, &block.Height)
if err != nil {
return nil, err
}

eh, err := MakeExtendedHeader(ctx, block, comm, vals, ce.shareStore)
if err != nil {
return nil, err
}

// verify hashes match
if !hashMatch(hash, extHeader.Hash().Bytes()) {
return nil, fmt.Errorf("incorrect hash in header: expected %x, got %x", hash, extHeader.Hash().Bytes())
if !bytes.Equal(hash, eh.Hash()) {
return nil, fmt.Errorf("incorrect hash in header: expected %x, got %x", hash, eh.Hash())
}
return extHeader, nil
}

func hashMatch(expected, got []byte) bool {
return bytes.Equal(expected, got)
return eh, nil
}

func (ce *CoreExchange) RequestHead(ctx context.Context) (*ExtendedHeader, error) {
log.Debug("core: requesting head")
chainHead, err := ce.fetcher.GetBlock(ctx, nil)
if err != nil {
return nil, err
}
return ce.generateExtendedHeaderFromBlock(chainHead)
return ce.getExtendedHeaderByHeight(ctx, nil)
}

func (ce *CoreExchange) generateExtendedHeaderFromBlock(block *types.Block) (*ExtendedHeader, error) {
// erasure code the block
extended, err := ce.extendBlockData(block)
if err != nil {
log.Errorw("computing extended data square", "err msg", err, "block height",
block.Height, "block hash", block.Hash().String())
return nil, err
}
// write block data to store
dah := da.NewDataAvailabilityHeader(extended)
log.Debugw("generated DataAvailabilityHeader", "data root", dah.Hash())
// create ExtendedHeader
commit, err := ce.fetcher.Commit(context.Background(), &block.Height)
func (ce *CoreExchange) getExtendedHeaderByHeight(ctx context.Context, height *int64) (*ExtendedHeader, error) {
b, err := ce.fetcher.GetBlock(ctx, height)
if err != nil {
log.Errorw("fetching commit", "err", err, "height", block.Height)
return nil, err
}

valSet, err := ce.fetcher.ValidatorSet(context.Background(), &block.Height)
comm, vals, err := ce.fetcher.GetBlockInfo(ctx, &b.Height)
if err != nil {
log.Errorw("fetching validator set", "err", err, "height", block.Height)
return nil, err
}
extHeader := &ExtendedHeader{
RawHeader: block.Header,
DAH: &dah,
Commit: commit,
ValidatorSet: valSet,
}
// sanity check generated ExtendedHeader
err = extHeader.ValidateBasic()
if err != nil {
return nil, err
}
return extHeader, nil
}

// extendBlockData erasure codes the given raw block's data and returns the
// erasure coded block data upon success.
func (ce *CoreExchange) extendBlockData(raw *types.Block) (*rsmt2d.ExtendedDataSquare, error) {
return service.ExtendBlock(raw, ce.shareStore)
return MakeExtendedHeader(ctx, b, comm, vals, ce.shareStore)
}
9 changes: 5 additions & 4 deletions service/header/core_exchange_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package header

import (
"bytes"
"context"
"testing"

Expand All @@ -12,7 +13,7 @@ import (
)

func TestCoreExchange_RequestHeaders(t *testing.T) {
fetcher := createCoreFetcher()
fetcher := createCoreFetcher(t)
store := mdutils.Mock()

// generate 10 blocks
Expand All @@ -29,11 +30,11 @@ func Test_hashMatch(t *testing.T) {
expected := []byte("AE0F153556A4FA5C0B7C3BFE0BAF0EC780C031933B281A8D759BB34C1DA31C56")
mismatch := []byte("57A0D7FE69FE88B3D277C824B3ACB9B60E5E65837A802485DE5CBB278C43576A")

assert.False(t, hashMatch(expected, mismatch))
assert.False(t, bytes.Equal(expected, mismatch))
}

func createCoreFetcher() *core.BlockFetcher {
mock := core.MockEmbeddedClient()
func createCoreFetcher(t *testing.T) *core.BlockFetcher {
mock := core.EphemeralMockEmbeddedClient(t)
return core.NewBlockFetcher(mock)
}

Expand Down
Loading