Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
Merge pull request #124 from ipfs/experiment/provider-system-no-block…
Browse files Browse the repository at this point in the history
…s-no-static-config

Change bitswap provide toggle to not be static
  • Loading branch information
michaelavila authored May 20, 2019
2 parents a32fa8a + 94b505a commit 07a235d
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 43 deletions.
54 changes: 35 additions & 19 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ const (
)

var (
// ProvideEnabled is a variable that tells Bitswap whether or not
// to handle providing blocks (see experimental provider system)
ProvideEnabled = true

// HasBlockBufferSize is the buffer size of the channel for new blocks
// that need to be provided. They should get pulled over by the
// provideCollector even before they are actually provided.
Expand All @@ -58,11 +54,22 @@ var (
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)

// Option defines the functional option type that can be used to configure
// bitswap instances
type Option func(*Bitswap)

// ProvideEnabled is an option for enabling/disabling provide announcements
func ProvideEnabled(enabled bool) Option {
return func(bs *Bitswap) {
bs.provideEnabled = enabled
}
}

// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate. Runs until context is cancelled or bitswap.Close is called.
func New(parent context.Context, network bsnet.BitSwapNetwork,
bstore blockstore.Blockstore) exchange.Interface {
bstore blockstore.Blockstore, options ...Option) exchange.Interface {

// important to use provided parent context (since it may include important
// loggable data). It's probably not a good idea to allow bitswap to be
Expand Down Expand Up @@ -103,19 +110,25 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
}

bs := &Bitswap{
blockstore: bstore,
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
network: network,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
pqm: pqm,
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory),
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
sentHistogram: sentHistogram,
blockstore: bstore,
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
network: network,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
pqm: pqm,
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory),
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
sentHistogram: sentHistogram,
provideEnabled: true,
}

// apply functional options before starting and running bitswap
for _, option := range options {
option(bs)
}

bs.wm.Startup()
Expand Down Expand Up @@ -174,6 +187,9 @@ type Bitswap struct {

// the sessionmanager manages tracking sessions
sm *bssm.SessionManager

// whether or not to make provide announcements
provideEnabled bool
}

type counters struct {
Expand Down Expand Up @@ -253,7 +269,7 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {

bs.engine.AddBlock(blk)

if ProvideEnabled {
if bs.provideEnabled {
select {
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
Expand Down
17 changes: 7 additions & 10 deletions bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,30 +102,27 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
}

func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
bitswap.ProvideEnabled = false
defer func() { bitswap.ProvideEnabled = true }()

bssession.SetProviderSearchDelay(50 * time.Millisecond)
defer bssession.SetProviderSearchDelay(time.Second)
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
ig := testinstance.NewTestInstanceGenerator(net)
ig := testinstance.NewTestInstanceGenerator(net, bitswap.ProvideEnabled(false))
defer ig.Close()

hasBlock := ig.Next()
defer hasBlock.Exchange.Close()

wantsBlock := ig.Next()
defer wantsBlock.Exchange.Close()

if err := hasBlock.Exchange.HasBlock(block); err != nil {
t.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Millisecond)
defer cancel()

wantsBlock := ig.Next()
defer wantsBlock.Exchange.Close()

ns := wantsBlock.Exchange.NewSession(ctx).(*bssession.Session)
// set find providers delay to less than timeout context of this test
ns.SetBaseTickDelay(10 * time.Millisecond)

received, err := ns.GetBlock(ctx, block.Cid())
if received != nil {
Expand Down
27 changes: 14 additions & 13 deletions testinstance/testinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,24 @@ import (

// NewTestInstanceGenerator generates a new InstanceGenerator for the given
// testnet
func NewTestInstanceGenerator(
net tn.Network) InstanceGenerator {
func NewTestInstanceGenerator(net tn.Network, bsOptions ...bitswap.Option) InstanceGenerator {
ctx, cancel := context.WithCancel(context.Background())
return InstanceGenerator{
net: net,
seq: 0,
ctx: ctx, // TODO take ctx as param to Next, Instances
cancel: cancel,
net: net,
seq: 0,
ctx: ctx, // TODO take ctx as param to Next, Instances
cancel: cancel,
bsOptions: bsOptions,
}
}

// InstanceGenerator generates new test instances of bitswap+dependencies
type InstanceGenerator struct {
seq int
net tn.Network
ctx context.Context
cancel context.CancelFunc
seq int
net tn.Network
ctx context.Context
cancel context.CancelFunc
bsOptions []bitswap.Option
}

// Close closes the clobal context, shutting down all test instances
Expand All @@ -51,7 +52,7 @@ func (g *InstanceGenerator) Next() Instance {
if err != nil {
panic("FIXME") // TODO change signature
}
return NewInstance(g.ctx, g.net, p)
return NewInstance(g.ctx, g.net, p, g.bsOptions...)
}

// Instances creates N test instances of bitswap + dependencies
Expand Down Expand Up @@ -95,7 +96,7 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
// NB: It's easy make mistakes by providing the same peer ID to two different
// instances. To safeguard, use the InstanceGenerator to generate instances. It's
// just a much better idea.
func NewInstance(ctx context.Context, net tn.Network, p testutil.Identity) Instance {
func NewInstance(ctx context.Context, net tn.Network, p testutil.Identity, options ...bitswap.Option) Instance {
bsdelay := delay.Fixed(0)

adapter := net.Adapter(p)
Expand All @@ -108,7 +109,7 @@ func NewInstance(ctx context.Context, net tn.Network, p testutil.Identity) Insta
panic(err.Error()) // FIXME perhaps change signature and return error.
}

bs := bitswap.New(ctx, adapter, bstore).(*bitswap.Bitswap)
bs := bitswap.New(ctx, adapter, bstore, options...).(*bitswap.Bitswap)

return Instance{
Adapter: adapter,
Expand Down
2 changes: 1 addition & 1 deletion workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (bs *Bitswap) startWorkers(ctx context.Context, px process.Process) {
})
}

if ProvideEnabled {
if bs.provideEnabled {
// Start up a worker to manage sending out provides messages
px.Go(func(px process.Process) {
bs.provideCollector(ctx)
Expand Down

0 comments on commit 07a235d

Please sign in to comment.