Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

service/header: Refactor broadcaster to only be used within HeaderService when core (bridge mode) is enabled #303

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Month, DD, YYYY

- [feat(cmd): give a birth to cel-shed and p2p key utilities #281](https://github.com/celestiaorg/celestia-node/pull/281) [@Wondertan](https://github.com/Wondertan)
- [feat(cmd|node): MutualPeers Node option and CLI flag #280](https://github.com/celestiaorg/celestia-node/pull/280) [@Wondertan](https://github.com/Wondertan)
- [refactor(service/header): broadcaster only used within `HeaderService` in bridge mode](https://github.com/celestiaorg/celestia-node/pull/303) [@renaynay](https://github.com/renaynay)

### IMPROVEMENTS

Expand Down
5 changes: 5 additions & 0 deletions core/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,8 @@ func (f *BlockFetcher) UnsubscribeNewBlockEvent(ctx context.Context) error {

return f.client.Unsubscribe(ctx, newBlockSubscriber, newBlockEventQuery)
}

// IsRunning returns whether the Core client is running.
func (f *BlockFetcher) IsRunning() bool {
return f.client.IsRunning()
}
2 changes: 1 addition & 1 deletion das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (d *DASer) Stop(ctx context.Context) error {

// sampling validates availability for each Header received from header subscription.
func (d *DASer) sampling(ctx context.Context, sub header.Subscription) {
defer sub.Cancel()
defer sub.Cancel() //nolint:errcheck
defer close(d.done)
for {
h, err := sub.NextHeader(ctx)
Expand Down
2 changes: 1 addition & 1 deletion das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ func (mhs *mockHeaderSub) NextHeader(ctx context.Context) (*header.ExtendedHeade
return mhs.headers[0], nil
}

func (mhs *mockHeaderSub) Cancel() {}
func (mhs *mockHeaderSub) Cancel() error { return nil }

func (mhs *mockHeaderSub) Topic() *pubsub.Topic { return nil }
2 changes: 1 addition & 1 deletion node/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func lightComponents(cfg *Config, repo Repository) fx.Option {
)
}

// fullComponents keeps all the components as DI options required to built a Full Node.
// fullComponents keeps all the components as DI options required to build a Full Node.
func fullComponents(cfg *Config, repo Repository) fx.Option {
return fx.Options(
baseComponents(cfg, repo),
Expand Down
4 changes: 3 additions & 1 deletion node/full_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,12 @@ func TestFull_P2P_Streams(t *testing.T) {

func TestFull_WithRemoteCore(t *testing.T) {
// TODO(@Wondertan): Fix core
t.Skip("Skip until we fix core")

repo := MockRepository(t, DefaultConfig(Full))
remoteCore, protocol, ip := core.StartRemoteCore()
t.Cleanup(func() {
remoteCore.Stop() // nolint:errcheck
})
require.NotNil(t, remoteCore)
assert.True(t, remoteCore.IsRunning())

Expand Down
9 changes: 3 additions & 6 deletions node/services/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/core"
"github.com/celestiaorg/celestia-node/das"
"github.com/celestiaorg/celestia-node/node/fxutil"
"github.com/celestiaorg/celestia-node/service/block"
Expand All @@ -32,13 +31,13 @@ func HeaderSyncer(cfg Config) func(ex header.Exchange, store header.Store) (*hea
}

// HeaderService creates a new header.Service.
func HeaderService(lc fx.Lifecycle, syncer *header.Syncer, sub *pubsub.PubSub) (*header.Service, header.Broadcaster) {
func HeaderService(lc fx.Lifecycle, syncer *header.Syncer, sub *pubsub.PubSub) *header.Service {
service := header.NewHeaderService(syncer, sub)
lc.Append(fx.Hook{
OnStart: service.Start,
OnStop: service.Stop,
})
return service, service
return service
}

// HeaderExchangeP2P constructs new P2PExchange for headers.
Expand Down Expand Up @@ -78,11 +77,9 @@ func HeaderStore(ds datastore.Batching) (header.Store, error) {
// BlockService constructs new block.Service.
func BlockService(
lc fx.Lifecycle,
fetcher *core.BlockFetcher,
store ipld.DAGService,
broadcaster header.Broadcaster,
) *block.Service {
service := block.NewBlockService(fetcher, store, broadcaster)
service := block.NewBlockService(store)
lc.Append(fx.Hook{
OnStart: service.Start,
OnStop: service.Stop,
Expand Down
29 changes: 0 additions & 29 deletions service/block/erasure.go

This file was deleted.

105 changes: 0 additions & 105 deletions service/block/event.go

This file was deleted.

Loading