Skip to content

Commit

Permalink
core listener
Browse files Browse the repository at this point in the history
  • Loading branch information
renaynay committed Jan 6, 2022
1 parent e112545 commit 9f96fcd
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 10 deletions.
9 changes: 9 additions & 0 deletions core/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ func StartMockNode() *node.Node {
return rpctest.StartTendermint(app, rpctest.SuppressStdout, rpctest.RecreateConfig)
}

func EphemeralMockEmbeddedClient(t *testing.T) Client {
nd := StartMockNode()
t.Cleanup(func() {
nd.Stop() //nolint:errcheck
rpctest.StopTendermint(nd)
})
return NewEmbeddedFromNode(nd)
}

// MockEmbeddedClient returns a started mock Core Client.
func MockEmbeddedClient() Client {
return NewEmbeddedFromNode(StartMockNode())
Expand Down
4 changes: 4 additions & 0 deletions node/bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestNewBridge(t *testing.T) {
require.NotNil(t, node)
require.NotNil(t, node.Config)
require.NotNil(t, node.Host)
require.NotNil(t, node.HeaderServ)
require.Nil(t, node.BlockServ)
assert.NotZero(t, node.Type)
}
Expand Down Expand Up @@ -104,6 +105,9 @@ func TestBridge_WithRemoteCore(t *testing.T) {

store := MockStore(t, DefaultConfig(Bridge))
remoteCore, protocol, ip := core.StartRemoteCore()
t.Cleanup(func() {
remoteCore.Stop() // nolint:errcheck
})
require.NotNil(t, remoteCore)
assert.True(t, remoteCore.IsRunning())

Expand Down
2 changes: 1 addition & 1 deletion node/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func lightComponents(cfg *Config, store Store) fxutil.Option {
)
}

// fullComponents keeps all the components as DI options required to built a Full Node.
// fullComponents keeps all the components as DI options required to build a Full Node.
func bridgeComponents(cfg *Config, store Store) fxutil.Option {
return fxutil.Options(
fxutil.Supply(Bridge),
Expand Down
20 changes: 19 additions & 1 deletion node/core/core.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package core

import (
format "github.com/ipfs/go-ipld-format"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/core"
"github.com/celestiaorg/celestia-node/node/fxutil"
"github.com/celestiaorg/celestia-node/service/header"
Expand All @@ -26,7 +29,8 @@ func DefaultConfig() Config {
func Components(cfg Config, loader core.RepoLoader) fxutil.Option {
return fxutil.Options(
fxutil.Provide(core.NewBlockFetcher),
fxutil.ProvideAs(header.NewCoreExchange, new(header.Exchange)),
fxutil.Provide(HeaderCoreExchange),
fxutil.Provide(HeaderCoreListener),
fxutil.ProvideIf(cfg.Remote, func() (core.Client, error) {
return RemoteClient(cfg)
}),
Expand All @@ -49,6 +53,20 @@ func Components(cfg Config, loader core.RepoLoader) fxutil.Option {
)
}

func HeaderCoreExchange(fetcher *core.BlockFetcher, dag format.DAGService) (*header.CoreExchange, header.Exchange) {
ce := header.NewCoreExchange(fetcher, dag)
return ce, ce
}

func HeaderCoreListener(lc fx.Lifecycle, ex *header.CoreExchange, p2pSub *header.P2PSubscriber) *header.CoreListener {
cl := header.NewCoreListener(ex, p2pSub)
lc.Append(fx.Hook{
OnStart: cl.Start,
OnStop: cl.Stop,
})
return cl
}

// RemoteClient provides a constructor for core.Client over RPC.
func RemoteClient(cfg Config) (core.Client, error) {
return core.NewRemote(cfg.RemoteConfig.Protocol, cfg.RemoteConfig.RemoteAddr)
Expand Down
6 changes: 3 additions & 3 deletions service/header/core_exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func TestCoreExchange_RequestHeaders(t *testing.T) {
fetcher := createCoreFetcher()
fetcher := createCoreFetcher(t)
store := mdutils.Mock()

// generate 10 blocks
Expand All @@ -32,8 +32,8 @@ func Test_hashMatch(t *testing.T) {
assert.False(t, hashMatch(expected, mismatch))
}

func createCoreFetcher() *core.BlockFetcher {
mock := core.MockEmbeddedClient()
func createCoreFetcher(t *testing.T) *core.BlockFetcher {
mock := core.EphemeralMockEmbeddedClient(t)
return core.NewBlockFetcher(mock)
}

Expand Down
112 changes: 112 additions & 0 deletions service/header/core_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package header

import (
"context"
"fmt"

"github.com/tendermint/tendermint/types"
)

// CoreListener TODO @renaynay: document
type CoreListener struct {
ex *CoreExchange
p2pSub *P2PSubscriber

blockSub <-chan *types.Block

ctx context.Context
cancel context.CancelFunc
}

func NewCoreListener(ex *CoreExchange, p2pSub *P2PSubscriber) *CoreListener {
return &CoreListener{
ex: ex,
p2pSub: p2pSub,
}
}

// Start kicks off the CoreListener listener loop.
func (cl *CoreListener) Start(ctx context.Context) error {
cl.ctx, cl.cancel = context.WithCancel(context.Background())

go cl.listen(ctx)
return nil
}

// Stop stops the CoreListener listener loop.
func (cl *CoreListener) Stop(context.Context) error {
cl.cancel()
return nil
}

// listen kicks off a loop, listening for new block events from Core,
// generating ExtendedHeaders and broadcasting them to the header-sub
// gossipsub network.
func (cl *CoreListener) listen(ctx context.Context) {
// start subscription to core node new block events
err := cl.startBlockSubscription(ctx)
if err != nil {
log.Errorw("core-listener: starting subscription to core client", "err", err)
return
}

for {
select {
case <-cl.ctx.Done():
if err := cl.cancelBlockSubscription(context.Background()); err != nil {
log.Errorw("core-listener: canceling subscription to core", "err", err)
}
return
default:
// get next header from core
next, err := cl.nextHeader(cl.ctx)
if err != nil {
log.Errorw("core-listener: getting next header", "err", err)
return
}
if next == nil {
log.Debugw("core-listener: no subsequent header, exiting listening service")
return
}
// broadcast new ExtendedHeader
err = cl.p2pSub.Broadcast(cl.ctx, next)
if err != nil {
log.Errorw("core-listener: broadcasting next header", "height", next.Height,
"err", err)
return
}
}
}
}

// startBlockSubscription starts the CoreListener's subscription to new block events
// from Core.
func (cl *CoreListener) startBlockSubscription(ctx context.Context) error {
if cl.blockSub != nil {
return fmt.Errorf("subscription already started")
}

var err error
cl.blockSub, err = cl.ex.fetcher.SubscribeNewBlockEvent(ctx)

return err
}

// cancelBlockSubscription stops the CoreListener's subscription to new block events
// from Core.
func (cl *CoreListener) cancelBlockSubscription(ctx context.Context) error {
return cl.ex.fetcher.UnsubscribeNewBlockEvent(ctx)
}

// nextHeader returns the next latest header from Core.
func (cl *CoreListener) nextHeader(ctx context.Context) (*ExtendedHeader, error) {
select {
case <-ctx.Done():
return nil, nil
case newBlock, ok := <-cl.blockSub:
if !ok {
return nil, fmt.Errorf("subscription closed")
}
return cl.ex.generateExtendedHeaderFromBlock(newBlock)
}
}
77 changes: 77 additions & 0 deletions service/header/core_listener_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package header

import (
"context"
"testing"

mdutils "github.com/ipfs/go-merkledag/test"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/core"
)

// TestCoreListener tests the lifecycle of the core listener.
func TestCoreListener(t *testing.T) {
// create mocknet with two pubsub endpoints
ps1, ps2 := createMocknetWithTwoPubsubEndpoints(t)
// create second subscription endpoint to listen for CoreListener's pubsub messages
topic2, err := ps2.Join(PubSubTopic)
require.NoError(t, err)
sub, err := topic2.Subscribe()
require.NoError(t, err)

// create one block to store as Head in local store and then unsubscribe from block events
fetcher := createCoreFetcher(t)

// create CoreListener and start listening
cl := createCoreListener(t, fetcher, ps1)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

err = cl.Start(ctx)
require.NoError(t, err)

// ensure headers are getting broadcasted to the gossipsub topic
msg, err := sub.Next(context.Background())
require.NoError(t, err)

var resp ExtendedHeader
err = resp.UnmarshalBinary(msg.Data)
require.NoError(t, err)
require.Equal(t, 1, int(resp.Height))

require.NoError(t, cl.cancelBlockSubscription(context.Background()))
}

func createMocknetWithTwoPubsubEndpoints(t *testing.T) (*pubsub.PubSub, *pubsub.PubSub) {
host1, host2 := createMocknet(context.Background(), t)
// create pubsub for host
ps1, err := pubsub.NewGossipSub(context.Background(), host1,
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign))
require.NoError(t, err)
// create pubsub for peer-side (to test broadcast comes through network)
ps2, err := pubsub.NewGossipSub(context.Background(), host2,
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign))
require.NoError(t, err)
return ps1, ps2
}

func createCoreListener(
t *testing.T,
fetcher *core.BlockFetcher,
ps *pubsub.PubSub,
) *CoreListener {
// create all sub-services necessary for CoreListener
ex := NewCoreExchange(fetcher, mdutils.Mock())

p2pSub := NewP2PSubscriber(ps, nil)
err := p2pSub.Start(context.Background())
require.NoError(t, err)
t.Cleanup(func() {
p2pSub.Stop(context.Background()) //nolint:errcheck
})

return NewCoreListener(ex, p2pSub)
}
2 changes: 1 addition & 1 deletion service/header/p2p_exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestP2PExchange_RequestByHash(t *testing.T) {
}

func createMocknet(ctx context.Context, t *testing.T) (libhost.Host, libhost.Host) {
net, err := mocknet.FullMeshLinked(ctx, 2)
net, err := mocknet.FullMeshConnected(ctx, 2)
require.NoError(t, err)
// get host and peer
return net.Hosts()[0], net.Hosts()[1]
Expand Down
19 changes: 15 additions & 4 deletions service/header/p2p_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ func NewP2PSubscriber(ps *pubsub.PubSub, validator pubsub.ValidatorEx) *P2PSubsc

// Start starts the P2PSubscriber, registering a topic validator for the "header-sub"
// topic and joining it.
func (p *P2PSubscriber) Start(context.Context) error {
err := p.pubsub.RegisterTopicValidator(PubSubTopic, p.validator)
if err != nil {
return err
func (p *P2PSubscriber) Start(context.Context) (err error) {
if p.validator != nil {
err = p.pubsub.RegisterTopicValidator(PubSubTopic, p.validator)
if err != nil {
return err
}
}

p.topic, err = p.pubsub.Join(PubSubTopic)
Expand All @@ -60,3 +62,12 @@ func (p *P2PSubscriber) Subscribe() (Subscription, error) {

return newSubscription(p.topic)
}

// Broadcast broadcasts the given ExtendedHeader to the topic.
func (p *P2PSubscriber) Broadcast(ctx context.Context, header *ExtendedHeader) error {
bin, err := header.MarshalBinary()
if err != nil {
return err
}
return p.topic.Publish(ctx, bin)
}
4 changes: 4 additions & 0 deletions service/header/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ type Service struct {
p2pServer *P2PExchangeServer
}

// TODO @renaynay: how will we register core listener on the header Service? It's a part of header service but
// we can't pass it directly to constructor b/c only Bridge nodes provide CoreListener, otherwise it'll always be nil
// maybe we can make it an interface? Still hacky.

// NewHeaderService creates a new instance of header Service.
func NewHeaderService(
syncer *Syncer,
Expand Down

0 comments on commit 9f96fcd

Please sign in to comment.