From af0277b15ec17214965ef15083a2cd2f31b9f93f Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Mon, 10 Jan 2022 12:39:41 +0100 Subject: [PATCH] services/header: Refactor `HeaderService` to be responsible for broadcasting new `ExtendedHeader`s to the gossipsub network (#327) * core listener * set ctx and cancelfunc to nil after stopped service * changelog * changelog remove todod * feat(core): add GetBlockInfo to Fetcher (#3) feat(service/header): make a helper to assemble ExtendedHeaders refactor(service/header): make core exchange to rely on new helpers refactor(service/header): make core listener to rely on new helpers refactor(node/core): rework construction of core exchange refactor(service): remove erasure as redudandant chore: lint * chore: lint * Broadcaster code review + an alternative (#4) * fix(core): wrap errors instead of logging them * fix(service/header): fix lifecylces issues and make listener more minimal * fix(service/header): core listener should stop if at least one error happens * log(service/header): make annoying 'validation error' a warning * use commit height in GetBlockInfo Co-authored-by: Hlib Kanunnikov --- CHANGELOG-PENDING.md | 3 +- core/fetcher.go | 20 +++++ core/testing.go | 9 +++ node/bridge_test.go | 4 + node/components.go | 2 +- node/core/core.go | 57 ++++++-------- service/block/store_test.go | 6 +- service/erasure.go | 38 ---------- service/header/core_exchange.go | 71 +++++------------- service/header/core_exchange_test.go | 9 ++- service/header/core_listener.go | 107 +++++++++++++++++++++++++++ service/header/core_listener_test.go | 78 +++++++++++++++++++ service/header/header.go | 28 +++++++ service/header/p2p_exchange.go | 5 +- service/header/p2p_exchange_test.go | 2 +- service/header/p2p_subscriber.go | 19 ++++- 16 files changed, 318 insertions(+), 140 deletions(-) delete mode 100644 service/erasure.go create mode 100644 service/header/core_listener.go create mode 100644 service/header/core_listener_test.go diff --git a/CHANGELOG-PENDING.md b/CHANGELOG-PENDING.md index 17e2a28dfa..74979e5f81 100644 --- a/CHANGELOG-PENDING.md +++ b/CHANGELOG-PENDING.md @@ -18,13 +18,14 @@ Month, DD, YYYY ### IMPROVEMENTS +- [services/header: Refactor `HeaderService` to be responsible for broadcasting new `ExtendedHeader`s to the gossipsub network](https://github.com/celestiaorg/celestia-node/pull/327) [@renaynay](https://github.com/renaynay) - [cmd: introduce Env - an Environment for CLI commands #313](https://github.com/celestiaorg/celestia-node/pull/313) [@Wondertan](https://github.com/Wondertan) - [chore: bump deps #297](https://github.com/celestiaorg/celestia-node/pull/297) [@Wondertan](https://github.com/Wondertan) - [workflows/lint: update golangci-lint to v1.43 #308](https://github.com/celestiaorg/celestia-node/pull/308) [@Wondertan](https://github.com/Wondertan) - [feat(node): extract overrides from Config into Settings #292](https://github.com/celestiaorg/celestia-node/pull/292) [@Wondertan](https://github.com/Wondertan) - [node: Adding WithHost options to settings section #301](https://github.com/celestiaorg/celestia-node/pull/301) [@Bidon15](https://github.com/Bidon15) - [node: Adding WithCoreClient option #305](https://github.com/celestiaorg/celestia-node/pull/305) [@Bidon15](https://github.com/Bidon15) -- [refactor(services/header): Refactor `HeaderService` to only manage its sub-services' lifecycles #317](https://github.com/celestiaorg/celestia-node/pull/317) [@renaynay](https://github.com/renaynay) +- [service/header: Refactor `HeaderService` to only manage its sub-services' lifecycles #317](https://github.com/celestiaorg/celestia-node/pull/317) [@renaynay](https://github.com/renaynay) - [docker] Created `docker/` dir with `Dockerfile` and `entrypoint.sh` script. - [chore(share): handle rows concurrently in GetSharesByNamespace #241](https://github.com/celestiaorg/celestia-node/pull/241) [@vgonkivs](https://github.com/vgonkivs) diff --git a/core/fetcher.go b/core/fetcher.go index 11e70f4b21..ff0e88fa97 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -26,6 +26,26 @@ func NewBlockFetcher(client Client) *BlockFetcher { } } +// GetBlockInfo queries Core for additional block information, like Commit and ValidatorSet. +func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height *int64) (*types.Commit, *types.ValidatorSet, error) { + commit, err := f.Commit(ctx, height) + if err != nil { + return nil, nil, fmt.Errorf("core/fetcher: getting commit: %w", err) + } + + // If a nil `height` is given as a parameter, there is a chance + // that a new block could be produced between getting the latest + // commit and getting the latest validator set. Therefore, it is + // best to get the validator set at the latest commit's height to + // prevent this potential inconsistency. + valSet, err := f.ValidatorSet(ctx, &commit.Height) + if err != nil { + return nil, nil, fmt.Errorf("core/fetcher: getting validator set: %w", err) + } + + return commit, valSet, nil +} + // GetBlock queries Core for a `Block` at the given height. func (f *BlockFetcher) GetBlock(ctx context.Context, height *int64) (*types.Block, error) { res, err := f.client.Block(ctx, height) diff --git a/core/testing.go b/core/testing.go index 5e812aeeba..b4e26a443f 100644 --- a/core/testing.go +++ b/core/testing.go @@ -26,6 +26,15 @@ func StartMockNode() *node.Node { return rpctest.StartTendermint(app, rpctest.SuppressStdout, rpctest.RecreateConfig) } +func EphemeralMockEmbeddedClient(t *testing.T) Client { + nd := StartMockNode() + t.Cleanup(func() { + nd.Stop() //nolint:errcheck + rpctest.StopTendermint(nd) + }) + return NewEmbeddedFromNode(nd) +} + // MockEmbeddedClient returns a started mock Core Client. func MockEmbeddedClient() Client { return NewEmbeddedFromNode(StartMockNode()) diff --git a/node/bridge_test.go b/node/bridge_test.go index 01e7388652..d08d61c965 100644 --- a/node/bridge_test.go +++ b/node/bridge_test.go @@ -21,6 +21,7 @@ func TestNewBridge(t *testing.T) { require.NotNil(t, node) require.NotNil(t, node.Config) require.NotNil(t, node.Host) + require.NotNil(t, node.HeaderServ) require.Nil(t, node.BlockServ) assert.NotZero(t, node.Type) } @@ -104,6 +105,9 @@ func TestBridge_WithRemoteCore(t *testing.T) { store := MockStore(t, DefaultConfig(Bridge)) remoteCore, protocol, ip := core.StartRemoteCore() + t.Cleanup(func() { + remoteCore.Stop() // nolint:errcheck + }) require.NotNil(t, remoteCore) assert.True(t, remoteCore.IsRunning()) diff --git a/node/components.go b/node/components.go index 070599d7a4..83d7ffbde5 100644 --- a/node/components.go +++ b/node/components.go @@ -19,7 +19,7 @@ func lightComponents(cfg *Config, store Store) fxutil.Option { ) } -// fullComponents keeps all the components as DI options required to built a Full Node. +// fullComponents keeps all the components as DI options required to build a Full Node. func bridgeComponents(cfg *Config, store Store) fxutil.Option { return fxutil.Options( fxutil.Supply(Bridge), diff --git a/node/core/core.go b/node/core/core.go index ce489fbd91..d8e8bc4299 100644 --- a/node/core/core.go +++ b/node/core/core.go @@ -1,8 +1,7 @@ package core import ( - "context" - + format "github.com/ipfs/go-ipld-format" "go.uber.org/fx" "github.com/celestiaorg/celestia-node/core" @@ -31,24 +30,14 @@ func Components(cfg Config, loader core.RepoLoader) fxutil.Option { return fxutil.Options( fxutil.Provide(core.NewBlockFetcher), fxutil.ProvideAs(header.NewCoreExchange, new(header.Exchange)), - fxutil.ProvideIf(cfg.Remote, func(lc fx.Lifecycle) (core.Client, error) { - client, err := RemoteClient(cfg) - if err != nil { - return nil, err - } - - lc.Append(fx.Hook{ - OnStart: func(_ context.Context) error { - return client.Start() - }, - OnStop: func(_ context.Context) error { - return client.Stop() - }, - }) - - return client, nil + fxutil.Provide(HeaderCoreListener), + fxutil.ProvideIf(cfg.Remote, func() (core.Client, error) { + return RemoteClient(cfg) + }), + fxutil.InvokeIf(cfg.Remote, func(c core.Client) error { + return c.Start() }), - fxutil.ProvideIf(!cfg.Remote, func(lc fx.Lifecycle) (core.Client, error) { + fxutil.ProvideIf(!cfg.Remote, func() (core.Client, error) { store, err := loader() if err != nil { return nil, err @@ -59,25 +48,25 @@ func Components(cfg Config, loader core.RepoLoader) fxutil.Option { return nil, err } - client, err := core.NewEmbedded(cfg) - if err != nil { - return nil, err - } - - lc.Append(fx.Hook{ - OnStart: func(_ context.Context) error { - return client.Start() - }, - OnStop: func(_ context.Context) error { - return client.Stop() - }, - }) - - return client, nil + return core.NewEmbedded(cfg) }), ) } +func HeaderCoreListener( + lc fx.Lifecycle, + ex *core.BlockFetcher, + p2pSub *header.P2PSubscriber, + dag format.DAGService, +) *header.CoreListener { + cl := header.NewCoreListener(p2pSub, ex, dag) + lc.Append(fx.Hook{ + OnStart: cl.Start, + OnStop: cl.Stop, + }) + return cl +} + // RemoteClient provides a constructor for core.Client over RPC. func RemoteClient(cfg Config) (core.Client, error) { return core.NewRemote(cfg.RemoteConfig.Protocol, cfg.RemoteConfig.RemoteAddr) diff --git a/service/block/store_test.go b/service/block/store_test.go index 23529f4402..7149995753 100644 --- a/service/block/store_test.go +++ b/service/block/store_test.go @@ -9,10 +9,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/celestiaorg/celestia-node/ipld" + "github.com/tendermint/tendermint/pkg/da" "github.com/celestiaorg/celestia-node/core" - "github.com/celestiaorg/celestia-node/service" "github.com/celestiaorg/celestia-node/service/header" ) @@ -47,7 +48,8 @@ func generateRawAndExtendedBlock(t *testing.T, store format.DAGService) *Block { rawBlock, err := fetcher.GetBlock(context.Background(), nil) require.NoError(t, err) // extend block - extended, err := service.ExtendBlock(rawBlock, store) + shares, _ := rawBlock.ComputeShares() + extended, err := ipld.PutData(context.Background(), shares.RawShares(), store) require.NoError(t, err) // generate dah dah := da.NewDataAvailabilityHeader(extended) diff --git a/service/erasure.go b/service/erasure.go deleted file mode 100644 index 715574d90a..0000000000 --- a/service/erasure.go +++ /dev/null @@ -1,38 +0,0 @@ -package service - -import ( - "context" - "math" - - format "github.com/ipfs/go-ipld-format" - - "github.com/tendermint/tendermint/pkg/wrapper" - core "github.com/tendermint/tendermint/types" - - "github.com/celestiaorg/celestia-node/ipld" - "github.com/celestiaorg/nmt" - "github.com/celestiaorg/rsmt2d" -) - -// ExtendBlock erasure codes the given raw block's data and returns the -// erasure coded block data upon success. -func ExtendBlock(block *core.Block, dag format.DAGService) (*rsmt2d.ExtendedDataSquare, error) { - namespacedShares, _ := block.Data.ComputeShares() - shares := namespacedShares.RawShares() - - // create nmt adder wrapping batch adder - batchAdder := ipld.NewNmtNodeAdder(context.Background(), format.NewBatch(context.Background(), dag)) - - // create the nmt wrapper to generate row and col commitments - squareSize := squareSize64(len(namespacedShares)) - tree := wrapper.NewErasuredNamespacedMerkleTree(squareSize, nmt.NodeVisitor(batchAdder.Visit)) - - // compute extended square - return rsmt2d.ComputeExtendedDataSquare(shares, rsmt2d.NewRSGF8Codec(), tree.Constructor) -} - -// squareSize64 computes the square size as uint64 from -// the given length of shares. -func squareSize64(length int) uint64 { - return uint64(math.Sqrt(float64(length))) -} diff --git a/service/header/core_exchange.go b/service/header/core_exchange.go index 65594d2f76..ec6b014104 100644 --- a/service/header/core_exchange.go +++ b/service/header/core_exchange.go @@ -8,12 +8,8 @@ import ( format "github.com/ipfs/go-ipld-format" tmbytes "github.com/tendermint/tendermint/libs/bytes" - "github.com/tendermint/tendermint/pkg/da" - "github.com/tendermint/tendermint/types" "github.com/celestiaorg/celestia-node/core" - "github.com/celestiaorg/celestia-node/service" - "github.com/celestiaorg/rsmt2d" ) type CoreExchange struct { @@ -31,11 +27,7 @@ func NewCoreExchange(fetcher *core.BlockFetcher, dag format.DAGService) *CoreExc func (ce *CoreExchange) RequestHeader(ctx context.Context, height uint64) (*ExtendedHeader, error) { log.Debugw("core: requesting header", "height", height) intHeight := int64(height) - block, err := ce.fetcher.GetBlock(ctx, &intHeight) - if err != nil { - return nil, err - } - return ce.generateExtendedHeaderFromBlock(block) + return ce.getExtendedHeaderByHeight(ctx, &intHeight) } func (ce *CoreExchange) RequestHeaders(ctx context.Context, from, amount uint64) ([]*ExtendedHeader, error) { @@ -59,69 +51,40 @@ func (ce *CoreExchange) RequestByHash(ctx context.Context, hash tmbytes.HexBytes if err != nil { return nil, err } - extHeader, err := ce.generateExtendedHeaderFromBlock(block) + + comm, vals, err := ce.fetcher.GetBlockInfo(ctx, &block.Height) if err != nil { return nil, err } + + eh, err := MakeExtendedHeader(ctx, block, comm, vals, ce.shareStore) + if err != nil { + return nil, err + } + // verify hashes match - if !hashMatch(hash, extHeader.Hash().Bytes()) { - return nil, fmt.Errorf("incorrect hash in header: expected %x, got %x", hash, extHeader.Hash().Bytes()) + if !bytes.Equal(hash, eh.Hash()) { + return nil, fmt.Errorf("incorrect hash in header: expected %x, got %x", hash, eh.Hash()) } - return extHeader, nil -} -func hashMatch(expected, got []byte) bool { - return bytes.Equal(expected, got) + return eh, nil } func (ce *CoreExchange) RequestHead(ctx context.Context) (*ExtendedHeader, error) { log.Debug("core: requesting head") - chainHead, err := ce.fetcher.GetBlock(ctx, nil) - if err != nil { - return nil, err - } - return ce.generateExtendedHeaderFromBlock(chainHead) + return ce.getExtendedHeaderByHeight(ctx, nil) } -func (ce *CoreExchange) generateExtendedHeaderFromBlock(block *types.Block) (*ExtendedHeader, error) { - // erasure code the block - extended, err := ce.extendBlockData(block) - if err != nil { - log.Errorw("computing extended data square", "err msg", err, "block height", - block.Height, "block hash", block.Hash().String()) - return nil, err - } - // write block data to store - dah := da.NewDataAvailabilityHeader(extended) - log.Debugw("generated DataAvailabilityHeader", "data root", dah.Hash()) - // create ExtendedHeader - commit, err := ce.fetcher.Commit(context.Background(), &block.Height) +func (ce *CoreExchange) getExtendedHeaderByHeight(ctx context.Context, height *int64) (*ExtendedHeader, error) { + b, err := ce.fetcher.GetBlock(ctx, height) if err != nil { - log.Errorw("fetching commit", "err", err, "height", block.Height) return nil, err } - valSet, err := ce.fetcher.ValidatorSet(context.Background(), &block.Height) + comm, vals, err := ce.fetcher.GetBlockInfo(ctx, &b.Height) if err != nil { - log.Errorw("fetching validator set", "err", err, "height", block.Height) return nil, err } - extHeader := &ExtendedHeader{ - RawHeader: block.Header, - DAH: &dah, - Commit: commit, - ValidatorSet: valSet, - } - // sanity check generated ExtendedHeader - err = extHeader.ValidateBasic() - if err != nil { - return nil, err - } - return extHeader, nil -} -// extendBlockData erasure codes the given raw block's data and returns the -// erasure coded block data upon success. -func (ce *CoreExchange) extendBlockData(raw *types.Block) (*rsmt2d.ExtendedDataSquare, error) { - return service.ExtendBlock(raw, ce.shareStore) + return MakeExtendedHeader(ctx, b, comm, vals, ce.shareStore) } diff --git a/service/header/core_exchange_test.go b/service/header/core_exchange_test.go index f50f2318a6..977cfb8e48 100644 --- a/service/header/core_exchange_test.go +++ b/service/header/core_exchange_test.go @@ -1,6 +1,7 @@ package header import ( + "bytes" "context" "testing" @@ -12,7 +13,7 @@ import ( ) func TestCoreExchange_RequestHeaders(t *testing.T) { - fetcher := createCoreFetcher() + fetcher := createCoreFetcher(t) store := mdutils.Mock() // generate 10 blocks @@ -29,11 +30,11 @@ func Test_hashMatch(t *testing.T) { expected := []byte("AE0F153556A4FA5C0B7C3BFE0BAF0EC780C031933B281A8D759BB34C1DA31C56") mismatch := []byte("57A0D7FE69FE88B3D277C824B3ACB9B60E5E65837A802485DE5CBB278C43576A") - assert.False(t, hashMatch(expected, mismatch)) + assert.False(t, bytes.Equal(expected, mismatch)) } -func createCoreFetcher() *core.BlockFetcher { - mock := core.MockEmbeddedClient() +func createCoreFetcher(t *testing.T) *core.BlockFetcher { + mock := core.EphemeralMockEmbeddedClient(t) return core.NewBlockFetcher(mock) } diff --git a/service/header/core_listener.go b/service/header/core_listener.go new file mode 100644 index 0000000000..93c8824936 --- /dev/null +++ b/service/header/core_listener.go @@ -0,0 +1,107 @@ +package header + +import ( + "context" + "errors" + "fmt" + + format "github.com/ipfs/go-ipld-format" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/tendermint/tendermint/types" + + "github.com/celestiaorg/celestia-node/core" +) + +// CoreListener is responsible for listening to Core for +// new block events and converting new Core blocks into +// the main data structure used in the Celestia DA network: +// `ExtendedHeader`. After digesting the Core block, extending +// it, and generating the `ExtendedHeader`, the CoreListener +// broadcasts the new `ExtendedHeader` to the header-sub gossipsub +// network. +type CoreListener struct { + p2pSub *P2PSubscriber + fetcher *core.BlockFetcher + dag format.DAGService + + blockSub <-chan *types.Block + + ctx context.Context + cancel context.CancelFunc +} + +func NewCoreListener(p2pSub *P2PSubscriber, fetcher *core.BlockFetcher, dag format.DAGService) *CoreListener { + return &CoreListener{ + p2pSub: p2pSub, + fetcher: fetcher, + dag: dag, + } +} + +// Start kicks off the CoreListener listener loop. +func (cl *CoreListener) Start(ctx context.Context) error { + if cl.blockSub != nil { + return fmt.Errorf("core-listener: already started") + } + + sub, err := cl.fetcher.SubscribeNewBlockEvent(ctx) + if err != nil { + return err + } + + cl.ctx, cl.cancel = context.WithCancel(context.Background()) + cl.blockSub = sub + go cl.listen() + return nil +} + +// Stop stops the CoreListener listener loop. +func (cl *CoreListener) Stop(ctx context.Context) error { + cl.cancel() + cl.blockSub = nil + return cl.fetcher.UnsubscribeNewBlockEvent(ctx) +} + +// listen kicks off a loop, listening for new block events from Core, +// generating ExtendedHeaders and broadcasting them to the header-sub +// gossipsub network. +func (cl *CoreListener) listen() { + defer log.Info("core-listener: listening stopped") + for { + select { + case b, ok := <-cl.blockSub: + if !ok { + return + } + + comm, vals, err := cl.fetcher.GetBlockInfo(cl.ctx, &b.Height) + if err != nil { + log.Errorw("core-listener: getting block info", "err", err) + return + } + + eh, err := MakeExtendedHeader(cl.ctx, b, comm, vals, cl.dag) + if err != nil { + log.Errorw("core-listener: making extended header", "err", err) + return + } + + // broadcast new ExtendedHeader + err = cl.p2pSub.Broadcast(cl.ctx, eh) + if err != nil { + var pserr pubsub.ValidationError + if errors.As(err, &pserr) && pserr.Reason == pubsub.RejectValidationIgnored { + log.Warnw("core-listener: broadcasting next header", "height", eh.Height, + "err", err) + } else { + log.Errorw("core-listener: broadcasting next header", "height", eh.Height, + "err", err) + } + + return + } + case <-cl.ctx.Done(): + return + } + } +} diff --git a/service/header/core_listener_test.go b/service/header/core_listener_test.go new file mode 100644 index 0000000000..6d386ca05d --- /dev/null +++ b/service/header/core_listener_test.go @@ -0,0 +1,78 @@ +package header + +import ( + "context" + "testing" + + mdutils "github.com/ipfs/go-merkledag/test" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/celestia-node/core" +) + +// TestCoreListener tests the lifecycle of the core listener. +func TestCoreListener(t *testing.T) { + // create mocknet with two pubsub endpoints + ps1, ps2 := createMocknetWithTwoPubsubEndpoints(t) + // create second subscription endpoint to listen for CoreListener's pubsub messages + topic2, err := ps2.Join(PubSubTopic) + require.NoError(t, err) + sub, err := topic2.Subscribe() + require.NoError(t, err) + + // create one block to store as Head in local store and then unsubscribe from block events + fetcher := createCoreFetcher(t) + + // create CoreListener and start listening + cl := createCoreListener(t, fetcher, ps1) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + err = cl.Start(ctx) + require.NoError(t, err) + + // ensure headers are getting broadcasted to the gossipsub topic + for i := 1; i < 6; i++ { + msg, err := sub.Next(context.Background()) + require.NoError(t, err) + + var resp ExtendedHeader + err = resp.UnmarshalBinary(msg.Data) + require.NoError(t, err) + require.Equal(t, i, int(resp.Height)) + } + + err = cl.Stop(ctx) + require.NoError(t, err) + require.Nil(t, cl.blockSub) +} + +func createMocknetWithTwoPubsubEndpoints(t *testing.T) (*pubsub.PubSub, *pubsub.PubSub) { + host1, host2 := createMocknet(context.Background(), t) + // create pubsub for host + ps1, err := pubsub.NewGossipSub(context.Background(), host1, + pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) + require.NoError(t, err) + // create pubsub for peer-side (to test broadcast comes through network) + ps2, err := pubsub.NewGossipSub(context.Background(), host2, + pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) + require.NoError(t, err) + return ps1, ps2 +} + +func createCoreListener( + t *testing.T, + fetcher *core.BlockFetcher, + ps *pubsub.PubSub, +) *CoreListener { + p2pSub := NewP2PSubscriber(ps, nil) + err := p2pSub.Start(context.Background()) + require.NoError(t, err) + t.Cleanup(func() { + p2pSub.Stop(context.Background()) //nolint:errcheck + }) + + return NewCoreListener(p2pSub, fetcher, mdutils.Mock()) +} diff --git a/service/header/header.go b/service/header/header.go index 197d0ab73c..81523ca455 100644 --- a/service/header/header.go +++ b/service/header/header.go @@ -2,12 +2,16 @@ package header import ( "bytes" + "context" "fmt" + format "github.com/ipfs/go-ipld-format" bts "github.com/tendermint/tendermint/libs/bytes" "github.com/tendermint/tendermint/pkg/da" core "github.com/tendermint/tendermint/types" + "github.com/celestiaorg/celestia-node/ipld" + header_pb "github.com/celestiaorg/celestia-node/service/header/pb" ) @@ -31,6 +35,30 @@ type ExtendedHeader struct { DAH *DataAvailabilityHeader `json:"dah"` } +// MakeExtendedHeader assembles new ExtendedHeader. +func MakeExtendedHeader( + ctx context.Context, + b *core.Block, + comm *core.Commit, + vals *core.ValidatorSet, + dag format.NodeAdder, +) (*ExtendedHeader, error) { + namespacedShares, _ := b.Data.ComputeShares() + extended, err := ipld.PutData(ctx, namespacedShares.RawShares(), dag) + if err != nil { + return nil, err + } + + dah := da.NewDataAvailabilityHeader(extended) + eh := &ExtendedHeader{ + RawHeader: b.Header, + DAH: &dah, + Commit: comm, + ValidatorSet: vals, + } + return eh, eh.ValidateBasic() +} + // Hash returns Hash of the wrapped RawHeader. // NOTE: It purposely overrides Hash method of RawHeader to get it directly from Commit without recomputing. func (eh *ExtendedHeader) Hash() bts.HexBytes { diff --git a/service/header/p2p_exchange.go b/service/header/p2p_exchange.go index c5a03d9c61..bac2d11f15 100644 --- a/service/header/p2p_exchange.go +++ b/service/header/p2p_exchange.go @@ -1,6 +1,7 @@ package header import ( + "bytes" "context" "fmt" "sync" @@ -68,6 +69,7 @@ func (ex *P2PExchange) Start(ctx context.Context) error { func (ex *P2PExchange) Stop(context.Context) error { log.Info("p2p: stopping p2p exchange") ex.cancel() + ex.ctx, ex.cancel = nil, nil return nil } @@ -124,7 +126,8 @@ func (ex *P2PExchange) RequestByHash(ctx context.Context, hash tmbytes.HexBytes) if err != nil { return nil, err } - if !hashMatch(headers[0].Hash().Bytes(), hash) { + + if !bytes.Equal(headers[0].Hash().Bytes(), hash) { return nil, fmt.Errorf("incorrect hash in header: expected %x, got %x", hash, headers[0].Hash().Bytes()) } return headers[0], nil diff --git a/service/header/p2p_exchange_test.go b/service/header/p2p_exchange_test.go index 3e561546aa..41c60eb526 100644 --- a/service/header/p2p_exchange_test.go +++ b/service/header/p2p_exchange_test.go @@ -102,7 +102,7 @@ func TestP2PExchange_RequestByHash(t *testing.T) { } func createMocknet(ctx context.Context, t *testing.T) (libhost.Host, libhost.Host) { - net, err := mocknet.FullMeshLinked(ctx, 2) + net, err := mocknet.FullMeshConnected(ctx, 2) require.NoError(t, err) // get host and peer return net.Hosts()[0], net.Hosts()[1] diff --git a/service/header/p2p_subscriber.go b/service/header/p2p_subscriber.go index 4dc9b351c9..b45abb0f69 100644 --- a/service/header/p2p_subscriber.go +++ b/service/header/p2p_subscriber.go @@ -31,10 +31,12 @@ func NewP2PSubscriber(ps *pubsub.PubSub, validator pubsub.ValidatorEx) *P2PSubsc // Start starts the P2PSubscriber, registering a topic validator for the "header-sub" // topic and joining it. -func (p *P2PSubscriber) Start(context.Context) error { - err := p.pubsub.RegisterTopicValidator(PubSubTopic, p.validator) - if err != nil { - return err +func (p *P2PSubscriber) Start(context.Context) (err error) { + if p.validator != nil { + err = p.pubsub.RegisterTopicValidator(PubSubTopic, p.validator) + if err != nil { + return err + } } p.topic, err = p.pubsub.Join(PubSubTopic) @@ -60,3 +62,12 @@ func (p *P2PSubscriber) Subscribe() (Subscription, error) { return newSubscription(p.topic) } + +// Broadcast broadcasts the given ExtendedHeader to the topic. +func (p *P2PSubscriber) Broadcast(ctx context.Context, header *ExtendedHeader) error { + bin, err := header.MarshalBinary() + if err != nil { + return err + } + return p.topic.Publish(ctx, bin) +}