This repository was archived by the owner on Oct 5, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 41
/
Copy pathdht.go
114 lines (99 loc) · 2.52 KB
/
dht.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package httpapi
import (
"context"
"encoding/json"
"github.com/ipfs/go-ipfs/core/coreapi/interface"
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
"github.com/libp2p/go-libp2p-peer"
"github.com/libp2p/go-libp2p-peerstore"
notif "github.com/libp2p/go-libp2p-routing/notifications"
)
type DhtAPI HttpApi
func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (peerstore.PeerInfo, error) {
var out struct {
Type notif.QueryEventType
Responses []peerstore.PeerInfo
}
resp, err := api.core().request("dht/findpeer", p.Pretty()).Send(ctx)
if err != nil {
return peerstore.PeerInfo{}, err
}
if resp.Error != nil {
return peerstore.PeerInfo{}, resp.Error
}
defer resp.Close()
dec := json.NewDecoder(resp.Output)
for {
if err := dec.Decode(&out); err != nil {
return peerstore.PeerInfo{}, err
}
if out.Type == notif.FinalPeer {
return out.Responses[0], nil
}
}
}
func (api *DhtAPI) FindProviders(ctx context.Context, p iface.Path, opts ...caopts.DhtFindProvidersOption) (<-chan peerstore.PeerInfo, error) {
options, err := caopts.DhtFindProvidersOptions(opts...)
if err != nil {
return nil, err
}
rp, err := api.core().ResolvePath(ctx, p)
if err != nil {
return nil, err
}
resp, err := api.core().request("dht/findprovs", rp.Cid().String()).
Option("num-providers", options.NumProviders).
Send(ctx)
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}
res := make(chan peerstore.PeerInfo)
go func() {
defer resp.Close()
defer close(res)
dec := json.NewDecoder(resp.Output)
for {
var out struct {
Extra string
Type notif.QueryEventType
Responses []peerstore.PeerInfo
}
if err := dec.Decode(&out); err != nil {
return // todo: handle this somehow
}
if out.Type == notif.QueryError {
return // usually a 'not found' error
// todo: handle other errors
}
if out.Type == notif.Provider {
for _, pi := range out.Responses {
select {
case res <- pi:
case <-ctx.Done():
return
}
}
}
}
}()
return res, nil
}
func (api *DhtAPI) Provide(ctx context.Context, p iface.Path, opts ...caopts.DhtProvideOption) error {
options, err := caopts.DhtProvideOptions(opts...)
if err != nil {
return err
}
rp, err := api.core().ResolvePath(ctx, p)
if err != nil {
return err
}
return api.core().request("dht/provide", rp.Cid().String()).
Option("recursive", options.Recursive).
Exec(ctx, nil)
}
func (api *DhtAPI) core() *HttpApi {
return (*HttpApi)(api)
}