-
-
Notifications
You must be signed in to change notification settings - Fork 3.1k
/
Copy pathbitswap.go
192 lines (159 loc) · 5.33 KB
/
bitswap.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
package bitswap
import (
"errors"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
blocks "github.com/jbenet/go-ipfs/blocks"
blockstore "github.com/jbenet/go-ipfs/blockstore"
exchange "github.com/jbenet/go-ipfs/exchange"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
// TODO rename -> Router?
type Routing interface {
// FindProvidersAsync returns a channel of providers for the given key
// TODO replace with timeout with context
FindProvidersAsync(u.Key, int, time.Duration) <-chan *peer.Peer
// Provide provides the key to the network
Provide(key u.Key) error
}
// TODO(brian): ensure messages are being received
// PartnerWantListMax is the bound for the number of keys we'll store per
// partner. These are usually taken from the top of the Partner's WantList
// advertisements. WantLists are sorted in terms of priority.
const PartnerWantListMax = 10
// bitswap instances implement the bitswap protocol.
type bitswap struct {
// peer is the identity of this (local) node.
peer *peer.Peer
// sender delivers messages on behalf of the session
sender bsnet.NetworkAdapter
// blockstore is the local database
// NB: ensure threadsafety
blockstore blockstore.Blockstore
// routing interface for communication
routing Routing
notifications notifications.PubSub
// strategy listens to network traffic and makes decisions about how to
// interact with partners.
// TODO(brian): save the strategy's state to the datastore
strategy strategy.Strategy
}
// NewSession initializes a bitswap session.
func NewSession(parent context.Context, s bsnet.NetworkService, p *peer.Peer, d ds.Datastore, directory Routing) exchange.Interface {
// FIXME(brian): instantiate a concrete Strategist
receiver := bsnet.Forwarder{}
bs := &bitswap{
blockstore: blockstore.NewBlockstore(d),
notifications: notifications.New(),
strategy: strategy.New(),
peer: p,
routing: directory,
sender: bsnet.NewNetworkAdapter(s, &receiver),
}
receiver.Delegate(bs)
return bs
}
// GetBlock attempts to retrieve a particular block from peers, within timeout.
func (bs *bitswap) Block(k u.Key, timeout time.Duration) (
*blocks.Block, error) {
begin := time.Now()
tleft := timeout - time.Now().Sub(begin)
provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout)
blockChannel := make(chan blocks.Block)
after := time.After(tleft)
// TODO: when the data is received, shut down this for loop ASAP
go func() {
for p := range provs_ch {
go func(pr *peer.Peer) {
blk, err := bs.getBlock(k, pr, tleft)
if err != nil {
return
}
select {
case blockChannel <- *blk:
default:
}
}(p)
}
}()
select {
case block := <-blockChannel:
close(blockChannel)
return &block, nil
case <-after:
return nil, u.ErrTimeout
}
}
func (bs *bitswap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) (*blocks.Block, error) {
ctx, _ := context.WithTimeout(context.Background(), timeout)
blockChannel := bs.notifications.Subscribe(ctx, k)
message := bsmsg.New()
message.AppendWanted(k)
// FIXME(brian): register the accountant on the service wrapper to ensure
// that accounting is _always_ performed when SendMessage and
// ReceiveMessage are called
bs.sender.SendMessage(ctx, p, message)
bs.strategy.MessageSent(p, message)
block, ok := <-blockChannel
if !ok {
return nil, u.ErrTimeout
}
return &block, nil
}
func (bs *bitswap) sendToPeersThatWant(block blocks.Block) {
for _, p := range bs.strategy.Peers() {
if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
go bs.send(p, block)
}
}
}
}
// HasBlock announces the existance of a block to bitswap, potentially sending
// it to peers (Partners) whose WantLists include it.
func (bs *bitswap) HasBlock(blk blocks.Block) error {
go bs.sendToPeersThatWant(blk)
return bs.routing.Provide(blk.Key())
}
// TODO(brian): get a return value
func (bs *bitswap) send(p *peer.Peer, b blocks.Block) {
message := bsmsg.New()
message.AppendBlock(b)
// FIXME(brian): pass ctx
bs.sender.SendMessage(context.Background(), p, message)
bs.strategy.MessageSent(p, message)
}
// TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage(
ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
bs.strategy.MessageReceived(sender, incoming)
if incoming.Blocks() != nil {
for _, block := range incoming.Blocks() {
go bs.blockstore.Put(block) // FIXME(brian): err ignored
go bs.notifications.Publish(block)
}
}
if incoming.Wantlist() != nil {
for _, key := range incoming.Wantlist() {
if bs.strategy.ShouldSendBlockToPeer(key, sender) {
block, errBlockNotFound := bs.blockstore.Get(key)
if errBlockNotFound != nil {
// TODO(brian): log/return the error
continue
}
go bs.send(sender, *block)
}
}
}
return nil, nil, errors.New("TODO implement")
}
func numBytes(b blocks.Block) int {
return len(b.Data)
}