From efd629ca147228fa50d79d684a81b7b5fe152c5b Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Fri, 17 May 2019 12:37:34 -0700 Subject: [PATCH 1/2] Introduce functional option for enabling/disabling provide This commit was moved from ipfs/go-bitswap@0bae16c6cbb946fa35fa215385b31d8a95ec9daa --- bitswap/bitswap.go | 54 ++++++++++++++++++---------- bitswap/bitswap_test.go | 14 ++++---- bitswap/testinstance/testinstance.go | 27 +++++++------- bitswap/workers.go | 2 +- 4 files changed, 56 insertions(+), 41 deletions(-) diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index 4a407feba..6213627af 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -42,10 +42,6 @@ const ( ) var ( - // ProvideEnabled is a variable that tells Bitswap whether or not - // to handle providing blocks (see experimental provider system) - ProvideEnabled = true - // HasBlockBufferSize is the buffer size of the channel for new blocks // that need to be provided. They should get pulled over by the // provideCollector even before they are actually provided. @@ -58,11 +54,22 @@ var ( metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22} ) +// Option defines the functional option type that can be used to configure +// bitswap instances +type Option func(*Bitswap) + +// ProvideEnabled is an option for enabling/disabling provide announcements +func ProvideEnabled(enabled bool) Option { + return func(bs *Bitswap) { + bs.provideEnabled = enabled + } +} + // New initializes a BitSwap instance that communicates over the provided // BitSwapNetwork. This function registers the returned instance as the network // delegate. Runs until context is cancelled or bitswap.Close is called. func New(parent context.Context, network bsnet.BitSwapNetwork, - bstore blockstore.Blockstore) exchange.Interface { + bstore blockstore.Blockstore, options ...Option) exchange.Interface { // important to use provided parent context (since it may include important // loggable data). It's probably not a good idea to allow bitswap to be @@ -103,19 +110,25 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, } bs := &Bitswap{ - blockstore: bstore, - engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method - network: network, - process: px, - newBlocks: make(chan cid.Cid, HasBlockBufferSize), - provideKeys: make(chan cid.Cid, provideKeysBufferSize), - wm: wm, - pqm: pqm, - sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory), - counters: new(counters), - dupMetric: dupHist, - allMetric: allHist, - sentHistogram: sentHistogram, + blockstore: bstore, + engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method + network: network, + process: px, + newBlocks: make(chan cid.Cid, HasBlockBufferSize), + provideKeys: make(chan cid.Cid, provideKeysBufferSize), + wm: wm, + pqm: pqm, + sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory), + counters: new(counters), + dupMetric: dupHist, + allMetric: allHist, + sentHistogram: sentHistogram, + provideEnabled: true, + } + + // apply functional options before starting and running bitswap + for _, option := range options { + option(bs) } bs.wm.Startup() @@ -174,6 +187,9 @@ type Bitswap struct { // the sessionmanager manages tracking sessions sm *bssm.SessionManager + + // whether or not to make provide announcements + provideEnabled bool } type counters struct { @@ -253,7 +269,7 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error { bs.engine.AddBlock(blk) - if ProvideEnabled { + if bs.provideEnabled { select { case bs.newBlocks <- blk.Cid(): // send block off to be reprovided diff --git a/bitswap/bitswap_test.go b/bitswap/bitswap_test.go index c1d059b4c..ce13ec68d 100644 --- a/bitswap/bitswap_test.go +++ b/bitswap/bitswap_test.go @@ -102,27 +102,25 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { } func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { - bitswap.ProvideEnabled = false - defer func() { bitswap.ProvideEnabled = true }() - + bssession.SetProviderSearchDelay(10 * time.Millisecond) net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) - ig := testinstance.NewTestInstanceGenerator(net) + ig := testinstance.NewTestInstanceGenerator(net, bitswap.ProvideEnabled(false)) defer ig.Close() hasBlock := ig.Next() defer hasBlock.Exchange.Close() + wantsBlock := ig.Next() + defer wantsBlock.Exchange.Close() + if err := hasBlock.Exchange.HasBlock(block); err != nil { t.Fatal(err) } - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) defer cancel() - wantsBlock := ig.Next() - defer wantsBlock.Exchange.Close() - ns := wantsBlock.Exchange.NewSession(ctx).(*bssession.Session) // set find providers delay to less than timeout context of this test ns.SetBaseTickDelay(10 * time.Millisecond) diff --git a/bitswap/testinstance/testinstance.go b/bitswap/testinstance/testinstance.go index f459065fc..bd61b90ed 100644 --- a/bitswap/testinstance/testinstance.go +++ b/bitswap/testinstance/testinstance.go @@ -19,23 +19,24 @@ import ( // NewTestInstanceGenerator generates a new InstanceGenerator for the given // testnet -func NewTestInstanceGenerator( - net tn.Network) InstanceGenerator { +func NewTestInstanceGenerator(net tn.Network, bsOptions ...bitswap.Option) InstanceGenerator { ctx, cancel := context.WithCancel(context.Background()) return InstanceGenerator{ - net: net, - seq: 0, - ctx: ctx, // TODO take ctx as param to Next, Instances - cancel: cancel, + net: net, + seq: 0, + ctx: ctx, // TODO take ctx as param to Next, Instances + cancel: cancel, + bsOptions: bsOptions, } } // InstanceGenerator generates new test instances of bitswap+dependencies type InstanceGenerator struct { - seq int - net tn.Network - ctx context.Context - cancel context.CancelFunc + seq int + net tn.Network + ctx context.Context + cancel context.CancelFunc + bsOptions []bitswap.Option } // Close closes the clobal context, shutting down all test instances @@ -51,7 +52,7 @@ func (g *InstanceGenerator) Next() Instance { if err != nil { panic("FIXME") // TODO change signature } - return NewInstance(g.ctx, g.net, p) + return NewInstance(g.ctx, g.net, p, g.bsOptions...) } // Instances creates N test instances of bitswap + dependencies @@ -95,7 +96,7 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration { // NB: It's easy make mistakes by providing the same peer ID to two different // instances. To safeguard, use the InstanceGenerator to generate instances. It's // just a much better idea. -func NewInstance(ctx context.Context, net tn.Network, p testutil.Identity) Instance { +func NewInstance(ctx context.Context, net tn.Network, p testutil.Identity, options ...bitswap.Option) Instance { bsdelay := delay.Fixed(0) adapter := net.Adapter(p) @@ -108,7 +109,7 @@ func NewInstance(ctx context.Context, net tn.Network, p testutil.Identity) Insta panic(err.Error()) // FIXME perhaps change signature and return error. } - bs := bitswap.New(ctx, adapter, bstore).(*bitswap.Bitswap) + bs := bitswap.New(ctx, adapter, bstore, options...).(*bitswap.Bitswap) return Instance{ Adapter: adapter, diff --git a/bitswap/workers.go b/bitswap/workers.go index 4a6e91dd6..fb3dc019f 100644 --- a/bitswap/workers.go +++ b/bitswap/workers.go @@ -25,7 +25,7 @@ func (bs *Bitswap) startWorkers(ctx context.Context, px process.Process) { }) } - if ProvideEnabled { + if bs.provideEnabled { // Start up a worker to manage sending out provides messages px.Go(func(px process.Process) { bs.provideCollector(ctx) From 3f70d374bae5f12fb7fffb79db2767f48ad43ec0 Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Mon, 20 May 2019 11:11:16 -0700 Subject: [PATCH 2/2] Fixup timing; Unset ProviderSearchDelay at test exit This commit was moved from ipfs/go-bitswap@94b505a64229ec01b3c6be432a83daac5f955c69 --- bitswap/bitswap_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/bitswap/bitswap_test.go b/bitswap/bitswap_test.go index ce13ec68d..fd3066abc 100644 --- a/bitswap/bitswap_test.go +++ b/bitswap/bitswap_test.go @@ -102,7 +102,8 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { } func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { - bssession.SetProviderSearchDelay(10 * time.Millisecond) + bssession.SetProviderSearchDelay(50 * time.Millisecond) + defer bssession.SetProviderSearchDelay(time.Second) net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) ig := testinstance.NewTestInstanceGenerator(net, bitswap.ProvideEnabled(false)) @@ -118,12 +119,10 @@ func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { t.Fatal(err) } - ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Millisecond) defer cancel() ns := wantsBlock.Exchange.NewSession(ctx).(*bssession.Session) - // set find providers delay to less than timeout context of this test - ns.SetBaseTickDelay(10 * time.Millisecond) received, err := ns.GetBlock(ctx, block.Cid()) if received != nil {