Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wire contexts into bitswap requests more deeply #2437

Merged
merged 3 commits into from
Apr 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []key.Key) <-chan *bloc
}
}

if len(misses) == 0 {
return
}

rblocks, err := s.Exchange.GetBlocks(ctx, misses)
if err != nil {
log.Debugf("Error with GetBlocks: %s", err)
Expand Down
53 changes: 19 additions & 34 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
notifications: notif,
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
network: network,
findKeys: make(chan *blockRequest, sizeBatchRequestChan),
findKeys: make(chan *wantlist.Entry, sizeBatchRequestChan),
process: px,
newBlocks: make(chan *blocks.Block, HasBlockBufferSize),
provideKeys: make(chan key.Key, provideKeysBufferSize),
Expand Down Expand Up @@ -129,7 +129,7 @@ type Bitswap struct {
notifications notifications.PubSub

// send keys to a worker to find and connect to providers for them
findKeys chan *blockRequest
findKeys chan *wantlist.Entry

engine *decision.Engine

Expand All @@ -146,8 +146,8 @@ type Bitswap struct {
}

type blockRequest struct {
keys []key.Key
ctx context.Context
key key.Key
ctx context.Context
}

// GetBlock attempts to retrieve a particular block from peers within the
Expand Down Expand Up @@ -208,6 +208,12 @@ func (bs *Bitswap) WantlistForPeer(p peer.ID) []key.Key {
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *blocks.Block, error) {
if len(keys) == 0 {
out := make(chan *blocks.Block)
close(out)
return out, nil
}

select {
case <-bs.process.Closing():
return nil, errors.New("bitswap is closed")
Expand All @@ -219,11 +225,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *block
log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k)
}

bs.wm.WantBlocks(keys)
bs.wm.WantBlocks(ctx, keys)

req := &blockRequest{
keys: keys,
ctx: ctx,
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
req := &wantlist.Entry{
Key: keys[0],
Ctx: ctx,
}
select {
case bs.findKeys <- req:
Expand Down Expand Up @@ -255,6 +264,8 @@ func (bs *Bitswap) HasBlock(blk *blocks.Block) error {

bs.notifications.Publish(blk)

bs.engine.AddBlock(blk)

select {
case bs.newBlocks <- blk:
// send block off to be reprovided
Expand All @@ -276,32 +287,6 @@ func (bs *Bitswap) tryPutBlock(blk *blocks.Block, attempts int) error {
return err
}

func (bs *Bitswap) connectToProviders(ctx context.Context, entries []wantlist.Entry) {

ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Get providers for all entries in wantlist (could take a while)
wg := sync.WaitGroup{}
for _, e := range entries {
wg.Add(1)
go func(k key.Key) {
defer wg.Done()

child, cancel := context.WithTimeout(ctx, providerRequestTimeout)
defer cancel()
providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest)
for prov := range providers {
go func(p peer.ID) {
bs.network.ConnectTo(ctx, p)
}(prov)
}
}(e.Key)
}

wg.Wait() // make sure all our children do finish.
}

func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
Expand Down
54 changes: 54 additions & 0 deletions exchange/bitswap/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,3 +308,57 @@ func TestBasicBitswap(t *testing.T) {
}
}
}

func TestDoubleGet(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
sg := NewTestSessionGenerator(net)
defer sg.Close()
bg := blocksutil.NewBlockGenerator()

t.Log("Test a one node trying to get one block from another")

instances := sg.Instances(2)
blocks := bg.Blocks(1)

ctx1, cancel1 := context.WithCancel(context.Background())

blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []key.Key{blocks[0].Key()})
if err != nil {
t.Fatal(err)
}

ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()

blkch2, err := instances[1].Exchange.GetBlocks(ctx2, []key.Key{blocks[0].Key()})
if err != nil {
t.Fatal(err)
}

// ensure both requests make it into the wantlist at the same time
time.Sleep(time.Millisecond * 100)
cancel1()

_, ok := <-blkch1
if ok {
t.Fatal("expected channel to be closed")
}

err = instances[0].Exchange.HasBlock(blocks[0])
if err != nil {
t.Fatal(err)
}

blk, ok := <-blkch2
if !ok {
t.Fatal("expected to get the block here")
}
t.Log(blk)

for _, inst := range instances {
err := inst.Exchange.Close()
if err != nil {
t.Fatal(err)
}
}
}
34 changes: 25 additions & 9 deletions exchange/bitswap/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type Engine struct {

bs bstore.Blockstore

lock sync.RWMutex // protects the fields immediatly below
lock sync.Mutex // protects the fields immediatly below
// ledgerMap lists Ledgers by their Partner key.
ledgerMap map[peer.ID]*ledger
}
Expand Down Expand Up @@ -178,8 +178,8 @@ func (e *Engine) Outbox() <-chan (<-chan *Envelope) {

// Returns a slice of Peers with whom the local node has active sessions
func (e *Engine) Peers() []peer.ID {
e.lock.RLock()
defer e.lock.RUnlock()
e.lock.Lock()
defer e.lock.Unlock()

response := make([]peer.ID, 0)
for _, ledger := range e.ledgerMap {
Expand Down Expand Up @@ -228,16 +228,32 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
for _, block := range m.Blocks() {
log.Debugf("got block %s %d bytes", block.Key(), len(block.Data))
l.ReceivedBytes(len(block.Data))
for _, l := range e.ledgerMap {
if entry, ok := l.WantListContains(block.Key()); ok {
e.peerRequestQueue.Push(entry, l.Partner)
newWorkExists = true
}
}
}
return nil
}

func (e *Engine) addBlock(block *blocks.Block) {
work := false

for _, l := range e.ledgerMap {
if entry, ok := l.WantListContains(block.Key()); ok {
e.peerRequestQueue.Push(entry, l.Partner)
work = true
}
}

if work {
e.signalNewWork()
}
}

func (e *Engine) AddBlock(block *blocks.Block) {
e.lock.Lock()
defer e.lock.Unlock()

e.addBlock(block)
}

// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
Expand Down
1 change: 0 additions & 1 deletion exchange/bitswap/decision/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func (l *ledger) ReceivedBytes(n int) {
l.Accounting.BytesRecv += uint64(n)
}

// TODO: this needs to be different. We need timeouts.
func (l *ledger) Wants(k key.Key, priority int) {
log.Debugf("peer %s wants %s", l.Partner, k)
l.wantList.Add(k, priority)
Expand Down
49 changes: 40 additions & 9 deletions exchange/bitswap/wantlist/wantlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
package wantlist

import (
key "github.com/ipfs/go-ipfs/blocks/key"
"sort"
"sync"

key "github.com/ipfs/go-ipfs/blocks/key"

"gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
)

type ThreadSafe struct {
Expand All @@ -16,14 +19,15 @@ type ThreadSafe struct {
// not threadsafe
type Wantlist struct {
set map[key.Key]Entry
// TODO provide O(1) len accessor if cost becomes an issue
}

type Entry struct {
// TODO consider making entries immutable so they can be shared safely and
// slices can be copied efficiently.
Key key.Key
Priority int

Ctx context.Context
cancel func()
RefCnt int
}

type entrySlice []Entry
Expand All @@ -45,21 +49,24 @@ func New() *Wantlist {
}

func (w *ThreadSafe) Add(k key.Key, priority int) {
// TODO rm defer for perf
w.lk.Lock()
defer w.lk.Unlock()
w.Wantlist.Add(k, priority)
}

func (w *ThreadSafe) AddEntry(e Entry) {
w.lk.Lock()
defer w.lk.Unlock()
w.Wantlist.AddEntry(e)
}

func (w *ThreadSafe) Remove(k key.Key) {
// TODO rm defer for perf
w.lk.Lock()
defer w.lk.Unlock()
w.Wantlist.Remove(k)
}

func (w *ThreadSafe) Contains(k key.Key) (Entry, bool) {
// TODO rm defer for perf
w.lk.RLock()
defer w.lk.RUnlock()
return w.Wantlist.Contains(k)
Expand Down Expand Up @@ -88,17 +95,41 @@ func (w *Wantlist) Len() int {
}

func (w *Wantlist) Add(k key.Key, priority int) {
if _, ok := w.set[k]; ok {
if e, ok := w.set[k]; ok {
e.RefCnt++
return
}

ctx, cancel := context.WithCancel(context.Background())
w.set[k] = Entry{
Key: k,
Priority: priority,
Ctx: ctx,
cancel: cancel,
RefCnt: 1,
}
}

func (w *Wantlist) AddEntry(e Entry) {
if _, ok := w.set[e.Key]; ok {
return
}
w.set[e.Key] = e
}

func (w *Wantlist) Remove(k key.Key) {
delete(w.set, k)
e, ok := w.set[k]
if !ok {
return
}

e.RefCnt--
if e.RefCnt <= 0 {
delete(w.set, k)
if e.cancel != nil {
e.cancel()
}
}
}

func (w *Wantlist) Contains(k key.Key) (Entry, bool) {
Expand Down
19 changes: 14 additions & 5 deletions exchange/bitswap/wantmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,24 @@ type msgQueue struct {
done chan struct{}
}

func (pm *WantManager) WantBlocks(ks []key.Key) {
func (pm *WantManager) WantBlocks(ctx context.Context, ks []key.Key) {
log.Infof("want blocks: %s", ks)
pm.addEntries(ks, false)
pm.addEntries(ctx, ks, false)
}

func (pm *WantManager) CancelWants(ks []key.Key) {
pm.addEntries(ks, true)
pm.addEntries(context.TODO(), ks, true)
}

func (pm *WantManager) addEntries(ks []key.Key, cancel bool) {
func (pm *WantManager) addEntries(ctx context.Context, ks []key.Key, cancel bool) {
var entries []*bsmsg.Entry
for i, k := range ks {
entries = append(entries, &bsmsg.Entry{
Cancel: cancel,
Entry: wantlist.Entry{
Key: k,
Priority: kMaxPriority - i,
Ctx: ctx,
},
})
}
Expand Down Expand Up @@ -224,7 +225,7 @@ func (pm *WantManager) Run() {
if e.Cancel {
pm.wl.Remove(e.Key)
} else {
pm.wl.Add(e.Key, e.Priority)
pm.wl.AddEntry(e.Entry)
}
}

Expand All @@ -237,6 +238,14 @@ func (pm *WantManager) Run() {
// resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY)
var es []*bsmsg.Entry
for _, e := range pm.wl.Entries() {
select {
case <-e.Ctx.Done():
// entry has been cancelled
// simply continue, the entry will be removed from the
// wantlist soon enough
continue
default:
}
es = append(es, &bsmsg.Entry{Entry: e})
}
for _, p := range pm.peers {
Expand Down
Loading