Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] coreapi: pubsub, dht, swarm #4774

Closed
wants to merge 11 commits into from
27 changes: 22 additions & 5 deletions core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,56 @@ func NewCoreAPI(n *core.IpfsNode) coreiface.CoreAPI {
return api
}

// Unixfs returns the UnixfsAPI interface backed by the go-ipfs node
// Unixfs returns the UnixfsAPI interface implementation backed by the go-ipfs node
func (api *CoreAPI) Unixfs() coreiface.UnixfsAPI {
return (*UnixfsAPI)(api)
}

// Block returns the BlockAPI interface implementation backed by the go-ipfs node
func (api *CoreAPI) Block() coreiface.BlockAPI {
return &BlockAPI{api, nil}
}

// Dag returns the DagAPI interface backed by the go-ipfs node
// Dag returns the DagAPI interface implementation backed by the go-ipfs node
func (api *CoreAPI) Dag() coreiface.DagAPI {
return &DagAPI{api, nil}
}

// Name returns the NameAPI interface backed by the go-ipfs node
// Name returns the NameAPI interface implementation backed by the go-ipfs node
func (api *CoreAPI) Name() coreiface.NameAPI {
return &NameAPI{api, nil}
}

// Key returns the KeyAPI interface backed by the go-ipfs node
// Key returns the KeyAPI interface implementation backed by the go-ipfs node
func (api *CoreAPI) Key() coreiface.KeyAPI {
return &KeyAPI{api, nil}
}

//Object returns the ObjectAPI interface backed by the go-ipfs node
// Object returns the ObjectAPI interface implementation backed by the go-ipfs node
func (api *CoreAPI) Object() coreiface.ObjectAPI {
return &ObjectAPI{api, nil}
}

// Pin returns the PinAPI interface implementation backed by the go-ipfs node
func (api *CoreAPI) Pin() coreiface.PinAPI {
return &PinAPI{api, nil}
}

// Dht returns the DhtAPI interface implementation backed by the go-ipfs node
func (api *CoreAPI) Dht() coreiface.DhtAPI {
return &DhtAPI{api, nil}
}

// PubSub returns the DhtAPI interface implementation backed by the go-ipfs node
func (api *CoreAPI) PubSub() coreiface.PubSubAPI {
return &PubSubAPI{api, nil}
}

// PubSub returns the DhtAPI interface implementation backed by the go-ipfs node
func (api *CoreAPI) Swarm() coreiface.SwarmAPI {
return &SwarmAPI{api}
}

// ResolveNode resolves the path `p` using Unixfx resolver, gets and returns the
// resolved Node.
func (api *CoreAPI) ResolveNode(ctx context.Context, p coreiface.Path) (coreiface.Node, error) {
Expand Down
239 changes: 239 additions & 0 deletions core/coreapi/dht.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
package coreapi

import (
"context"
"errors"
"fmt"

coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
dag "github.com/ipfs/go-ipfs/merkledag"

routing "gx/ipfs/QmTiWLZ6Fo5j4KcTVutZJ5KWRRJrbxzmxA4td8NfEdrPh7/go-libp2p-routing"
notif "gx/ipfs/QmTiWLZ6Fo5j4KcTVutZJ5KWRRJrbxzmxA4td8NfEdrPh7/go-libp2p-routing/notifications"
ipdht "gx/ipfs/QmVSep2WwKcXxMonPASsAJ3nZVjfVMKgMcaSigxKnUWpJv/go-libp2p-kad-dht"
pstore "gx/ipfs/QmXauCuJzmzapetmC6W4TuDJLL1yFFrVzSHoWv8YdbmnxH/go-libp2p-peerstore"
peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)

var ErrNotDHT = errors.New("routing service is not a DHT")

type DhtAPI struct {
*CoreAPI
*caopts.DhtOptions
}

func (api *DhtAPI) FindPeer(ctx context.Context, p coreiface.PeerID) (<-chan coreiface.Addr, error) {
dht, ok := api.node.Routing.(*ipdht.IpfsDHT)
if !ok {
return nil, ErrNotDHT
}

outChan := make(chan coreiface.Addr)
events := make(chan *notif.QueryEvent)
ctx = notif.RegisterForQueryEvents(ctx, events)

go func() {
defer close(outChan)

sendAddrs := func(responses []*pstore.PeerInfo) error {
for _, response := range responses {
for _, addr := range response.Addrs {
select {
case outChan <- addr:
case <-ctx.Done():
return ctx.Err()
}
}
}
return nil
}

for event := range events {
if event.Type == notif.FinalPeer {
err := sendAddrs(event.Responses)
if err != nil {
return
}
}
}
}()

go func() {
defer close(events)
pi, err := dht.FindPeer(ctx, peer.ID(p))
if err != nil {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.QueryError,
Extra: err.Error(),
})
return
}

notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.FinalPeer,
Responses: []*pstore.PeerInfo{&pi},
})
}()

return outChan, nil
}

func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ...caopts.DhtFindProvidersOption) (<-chan coreiface.PeerID, error) {
settings, err := caopts.DhtFindProvidersOptions(opts...)
if err != nil {
return nil, err
}

dht, ok := api.node.Routing.(*ipdht.IpfsDHT)
if !ok {
return nil, ErrNotDHT
}

p, err = api.ResolvePath(ctx, p)
if err != nil {
return nil, err
}

c := p.Cid()

numProviders := settings.NumProviders
if numProviders < 1 {
return nil, fmt.Errorf("number of providers must be greater than 0")
}

outChan := make(chan coreiface.PeerID)
events := make(chan *notif.QueryEvent)
ctx = notif.RegisterForQueryEvents(ctx, events)

pchan := dht.FindProvidersAsync(ctx, c, numProviders)
go func() {
defer close(outChan)

sendProviders := func(responses []*pstore.PeerInfo) error {
for _, response := range responses {
select {
case outChan <- coreiface.PeerID(response.ID):
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}

for event := range events {
if event.Type == notif.Provider {
err := sendProviders(event.Responses)
if err != nil {
return
}
}
}
}()

go func() {
defer close(events)
for p := range pchan {
np := p
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.Provider,
Responses: []*pstore.PeerInfo{&np},
})
}
}()

return outChan, nil
}

func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...caopts.DhtProvideOption) error {
settings, err := caopts.DhtProvideOptions(opts...)
if err != nil {
return err
}

if api.node.Routing == nil {
return errors.New("cannot provide in offline mode")
}

if len(api.node.PeerHost.Network().Conns()) == 0 {
return errors.New("cannot provide, no connected peers")
}

c := path.Cid()

has, err := api.node.Blockstore.Has(c)
if err != nil {
return err
}

if !has {
return fmt.Errorf("block %s not found locally, cannot provide", c)
}

//TODO: either remove or use
//outChan := make(chan interface{})

//events := make(chan *notif.QueryEvent)
//ctx = notif.RegisterForQueryEvents(ctx, events)

/*go func() {
defer close(outChan)
for range events {
select {
case <-ctx.Done():
return
default:
}
}
}()*/

//defer close(events)
if settings.Recursive {
err = provideKeysRec(ctx, api.node.Routing, api.node.DAG, []*cid.Cid{c})
} else {
err = provideKeys(ctx, api.node.Routing, []*cid.Cid{c})
}
if err != nil {
return err
}

return nil
}

func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []*cid.Cid) error {
for _, c := range cids {
err := r.Provide(ctx, c, true)
if err != nil {
return err
}
}
return nil
}

func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv ipld.DAGService, cids []*cid.Cid) error {
provided := cid.NewSet()
for _, c := range cids {
kset := cid.NewSet()

err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, kset.Visit)
if err != nil {
return err
}

for _, k := range kset.Keys() {
if provided.Has(k) {
continue
}

err = r.Provide(ctx, k, true)
if err != nil {
return err
}
provided.Add(k)
}
}

return nil
}
Loading