Skip to content

Commit

Permalink
fix doubleGet issue caused by hasblock not announcing
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <why@ipfs.io>
  • Loading branch information
whyrusleeping committed Apr 28, 2016
1 parent a5782ef commit 01d1b69
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 23 deletions.
2 changes: 2 additions & 0 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ func (bs *Bitswap) HasBlock(blk *blocks.Block) error {

bs.notifications.Publish(blk)

bs.engine.AddBlock(blk)

select {
case bs.newBlocks <- blk:
// send block off to be reprovided
Expand Down
2 changes: 2 additions & 0 deletions exchange/bitswap/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ func TestDoubleGet(t *testing.T) {
t.Fatal(err)
}

// ensure both requests make it into the wantlist at the same time
time.Sleep(time.Millisecond * 100)
cancel1()

_, ok := <-blkch1
Expand Down
36 changes: 26 additions & 10 deletions exchange/bitswap/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type Engine struct {

bs bstore.Blockstore

lock sync.RWMutex // protects the fields immediatly below
lock sync.Mutex // protects the fields immediatly below
// ledgerMap lists Ledgers by their Partner key.
ledgerMap map[peer.ID]*ledger
}
Expand Down Expand Up @@ -178,8 +178,8 @@ func (e *Engine) Outbox() <-chan (<-chan *Envelope) {

// Returns a slice of Peers with whom the local node has active sessions
func (e *Engine) Peers() []peer.ID {
e.lock.RLock()
defer e.lock.RUnlock()
e.lock.Lock()
defer e.lock.Unlock()

response := make([]peer.ID, 0)
for _, ledger := range e.ledgerMap {
Expand Down Expand Up @@ -217,7 +217,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
e.peerRequestQueue.Remove(entry.Key, p)
} else {
log.Debugf("wants %s - %d", entry.Key, entry.Priority)
l.Wants(entry.Ctx, entry.Key, entry.Priority)
l.Wants(entry.Key, entry.Priority)
if exists, err := e.bs.Has(entry.Key); err == nil && exists {
e.peerRequestQueue.Push(entry.Entry, p)
newWorkExists = true
Expand All @@ -228,16 +228,32 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
for _, block := range m.Blocks() {
log.Debugf("got block %s %d bytes", block.Key(), len(block.Data))
l.ReceivedBytes(len(block.Data))
for _, l := range e.ledgerMap {
if entry, ok := l.WantListContains(block.Key()); ok {
e.peerRequestQueue.Push(entry, l.Partner)
newWorkExists = true
}
}
}
return nil
}

func (e *Engine) addBlock(block *blocks.Block) {
work := false

for _, l := range e.ledgerMap {
if entry, ok := l.WantListContains(block.Key()); ok {
e.peerRequestQueue.Push(entry, l.Partner)
work = true
}
}

if work {
e.signalNewWork()
}
}

func (e *Engine) AddBlock(block *blocks.Block) {
e.lock.Lock()
defer e.lock.Unlock()

e.addBlock(block)
}

// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
Expand Down
7 changes: 2 additions & 5 deletions exchange/bitswap/decision/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
key "github.com/ipfs/go-ipfs/blocks/key"
wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
peer "gx/ipfs/QmZwZjMVGss5rqYsJVGy18gNbkTJffFyq2x1uJ4e4p3ZAt/go-libp2p-peer"

"gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
)

// keySet is just a convenient alias for maps of keys, where we only care
Expand Down Expand Up @@ -69,10 +67,9 @@ func (l *ledger) ReceivedBytes(n int) {
l.Accounting.BytesRecv += uint64(n)
}

// TODO: this needs to be different. We need timeouts.
func (l *ledger) Wants(ctx context.Context, k key.Key, priority int) {
func (l *ledger) Wants(k key.Key, priority int) {
log.Debugf("peer %s wants %s", l.Partner, k)
l.wantList.Add(ctx, k, priority)
l.wantList.Add(k, priority)
}

func (l *ledger) CancelWant(k key.Key) {
Expand Down
33 changes: 25 additions & 8 deletions exchange/bitswap/wantlist/wantlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ type Wantlist struct {
}

type Entry struct {
// TODO consider making entries immutable so they can be shared safely and
// slices can be copied efficiently.
Key key.Key
Priority int
Ctx context.Context

Ctx context.Context
cancel func()
RefCnt int
}

type entrySlice []Entry
Expand All @@ -47,10 +48,10 @@ func New() *Wantlist {
}
}

func (w *ThreadSafe) Add(ctx context.Context, k key.Key, priority int) {
func (w *ThreadSafe) Add(k key.Key, priority int) {
w.lk.Lock()
defer w.lk.Unlock()
w.Wantlist.Add(ctx, k, priority)
w.Wantlist.Add(k, priority)
}

func (w *ThreadSafe) AddEntry(e Entry) {
Expand Down Expand Up @@ -93,14 +94,19 @@ func (w *Wantlist) Len() int {
return len(w.set)
}

func (w *Wantlist) Add(ctx context.Context, k key.Key, priority int) {
if _, ok := w.set[k]; ok {
func (w *Wantlist) Add(k key.Key, priority int) {
if e, ok := w.set[k]; ok {
e.RefCnt++
return
}

ctx, cancel := context.WithCancel(context.Background())
w.set[k] = Entry{
Key: k,
Priority: priority,
Ctx: ctx,
cancel: cancel,
RefCnt: 1,
}
}

Expand All @@ -112,7 +118,18 @@ func (w *Wantlist) AddEntry(e Entry) {
}

func (w *Wantlist) Remove(k key.Key) {
delete(w.set, k)
e, ok := w.set[k]
if !ok {
return
}

e.RefCnt--
if e.RefCnt <= 0 {
delete(w.set, k)
if e.cancel != nil {
e.cancel()
}
}
}

func (w *Wantlist) Contains(k key.Key) (Entry, bool) {
Expand Down

0 comments on commit 01d1b69

Please sign in to comment.