-
-
Notifications
You must be signed in to change notification settings - Fork 3.1k
/
Copy pathbitswap.go
219 lines (186 loc) · 5.98 KB
/
bitswap.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package bitswap
import (
"errors"
"sync"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
blocks "github.com/jbenet/go-ipfs/blocks"
blockstore "github.com/jbenet/go-ipfs/blockstore"
exchange "github.com/jbenet/go-ipfs/exchange"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
// NetMessageSession initializes a BitSwap session that communicates over the
// provided NetMessage service
func NetMessageSession(parent context.Context, p *peer.Peer, s bsnet.NetMessageService, directory bsnet.Routing, d ds.Datastore) exchange.Interface {
networkAdapter := bsnet.NetMessageAdapter(s, nil)
bs := &bitswap{
blockstore: blockstore.NewBlockstore(d),
notifications: notifications.New(),
strategy: strategy.New(),
routing: directory,
sender: networkAdapter,
wantlist: WantList{
data: make(map[u.Key]struct{}),
},
}
networkAdapter.SetDelegate(bs)
return bs
}
// bitswap instances implement the bitswap protocol.
type bitswap struct {
// sender delivers messages on behalf of the session
sender bsnet.Adapter
// blockstore is the local database
// NB: ensure threadsafety
blockstore blockstore.Blockstore
// routing interface for communication
routing bsnet.Routing
notifications notifications.PubSub
// strategy listens to network traffic and makes decisions about how to
// interact with partners.
// TODO(brian): save the strategy's state to the datastore
strategy strategy.Strategy
wantlist WantList
}
type WantList struct {
lock sync.RWMutex
data map[u.Key]struct{}
}
func (wl *WantList) Add(k u.Key) {
u.DOut("Adding %v to Wantlist\n", k.Pretty())
wl.lock.Lock()
defer wl.lock.Unlock()
wl.data[k] = struct{}{}
}
func (wl *WantList) Remove(k u.Key) {
u.DOut("Removing %v from Wantlist\n", k.Pretty())
wl.lock.Lock()
defer wl.lock.Unlock()
delete(wl.data, k)
}
func (wl *WantList) Keys() []u.Key {
wl.lock.RLock()
defer wl.lock.RUnlock()
keys := make([]u.Key, 0)
for k, _ := range wl.data {
keys = append(keys, k)
}
return keys
}
// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context
//
// TODO ensure only one active request per key
func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) {
u.DOut("Get Block %v\n", k.Pretty())
ctx, cancelFunc := context.WithCancel(parent)
bs.wantlist.Add(k)
promise := bs.notifications.Subscribe(ctx, k)
const maxProviders = 20
peersToQuery := bs.routing.FindProvidersAsync(ctx, k, maxProviders)
go func() {
message := bsmsg.New()
for _, wanted := range bs.wantlist.Keys() {
message.AppendWanted(wanted)
}
message.AppendWanted(k)
for iiiii := range peersToQuery {
// u.DOut("bitswap got peersToQuery: %s\n", iiiii)
go func(p *peer.Peer) {
response, err := bs.sender.SendRequest(ctx, p, message)
if err != nil {
return
}
// FIXME ensure accounting is handled correctly when
// communication fails. May require slightly different API to
// get better guarantees. May need shared sequence numbers.
bs.strategy.MessageSent(p, message)
if response == nil {
return
}
bs.ReceiveMessage(ctx, p, response)
}(iiiii)
}
}()
select {
case block := <-promise:
cancelFunc()
bs.wantlist.Remove(k)
// TODO remove from wantlist
return &block, nil
case <-parent.Done():
return nil, parent.Err()
}
}
// HasBlock announces the existance of a block to bitswap, potentially sending
// it to peers (Partners) whose WantLists include it.
func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
u.DOut("Has Block %v\n", blk.Key().Pretty())
bs.wantlist.Remove(blk.Key())
bs.sendToPeersThatWant(ctx, blk)
return bs.routing.Provide(ctx, blk.Key())
}
// TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
u.DOut("ReceiveMessage from %v\n", p.Key().Pretty())
if p == nil {
return nil, nil, errors.New("Received nil Peer")
}
if incoming == nil {
return nil, nil, errors.New("Received nil Message")
}
bs.strategy.MessageReceived(p, incoming) // FIRST
for _, block := range incoming.Blocks() {
// TODO verify blocks?
if err := bs.blockstore.Put(block); err != nil {
continue // FIXME(brian): err ignored
}
go bs.notifications.Publish(block)
go func(block blocks.Block) {
_ = bs.HasBlock(ctx, block) // FIXME err ignored
}(block)
}
message := bsmsg.New()
for _, wanted := range bs.wantlist.Keys() {
message.AppendWanted(wanted)
}
for _, key := range incoming.Wantlist() {
if bs.strategy.ShouldSendBlockToPeer(key, p) {
if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
continue
} else {
message.AppendBlock(*block)
}
}
}
defer bs.strategy.MessageSent(p, message)
return p, message, nil
}
// send strives to ensure that accounting is always performed when a message is
// sent
func (bs *bitswap) send(ctx context.Context, p *peer.Peer, m bsmsg.BitSwapMessage) {
bs.sender.SendMessage(ctx, p, m)
go bs.strategy.MessageSent(p, m)
}
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
u.DOut("Sending %v to peers that want it\n", block.Key().Pretty())
for _, p := range bs.strategy.Peers() {
if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
u.DOut("%v wants %v\n", p.Key().Pretty(), block.Key().Pretty())
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
message := bsmsg.New()
message.AppendBlock(block)
for _, wanted := range bs.wantlist.Keys() {
message.AppendWanted(wanted)
}
go bs.send(ctx, p, message)
}
}
}
}