Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

don't block on DiscoverHandler #115

Merged
merged 1 commit into from
Sep 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 33 additions & 20 deletions p2p/discovery/mdns.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package discovery

import (
"context"
"errors"
"io"
"io/ioutil"
Expand Down Expand Up @@ -60,7 +61,7 @@ func getDialableListenAddrs(ph host.Host) ([]*net.TCPAddr, error) {
return out, nil
}

func NewMdnsService(peerhost host.Host, interval time.Duration) (Service, error) {
func NewMdnsService(ctx context.Context, peerhost host.Host, interval time.Duration) (Service, error) {

// TODO: dont let mdns use logging...
golog.SetOutput(ioutil.Discard)
Expand Down Expand Up @@ -99,7 +100,7 @@ func NewMdnsService(peerhost host.Host, interval time.Duration) (Service, error)
interval: interval,
}

go s.pollForEntries()
go s.pollForEntries(ctx)

return s, nil
}
Expand All @@ -108,38 +109,50 @@ func (m *mdnsService) Close() error {
return m.server.Shutdown()
}

func (m *mdnsService) pollForEntries() {
func (m *mdnsService) pollForEntries(ctx context.Context) {

ticker := time.NewTicker(m.interval)
for range ticker.C {
entriesCh := make(chan *mdns.ServiceEntry, 16)
go func() {
for entry := range entriesCh {
m.handleEntry(entry)
for {
select {
case <-ticker.C:
entriesCh := make(chan *mdns.ServiceEntry, 16)
go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is incorrect-- how does it pass tests?

this chan creation and goroutine is here because there may be mdns.Query errors for a single peer. the way you moved it out to the top of pollForEntries will make it so that it will close the chan exit after the first error-- and it may even call close(entriesCh) multiple times (!!!).

move the chan creation + goroutine back to within the for loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah... I was wondering where we closed that

for entry := range entriesCh {
m.handleEntry(entry)
}
}()

log.Debug("starting mdns query")
qp := &mdns.QueryParam{
Domain: "local",
Entries: entriesCh,
Service: ServiceTag,
Timeout: time.Second * 5,
}
}()

qp := mdns.QueryParam{}
qp.Domain = "local"
qp.Entries = entriesCh
qp.Service = ServiceTag
qp.Timeout = time.Second * 5

err := mdns.Query(&qp)
if err != nil {
log.Error("mdns lookup error: ", err)
err := mdns.Query(qp)
if err != nil {
log.Error("mdns lookup error: ", err)
}
close(entriesCh)
log.Debug("mdns query complete")
case <-ctx.Done():
log.Debug("mdns service halting")
return
}
close(entriesCh)
}
}

func (m *mdnsService) handleEntry(e *mdns.ServiceEntry) {
log.Debugf("Handling MDNS entry: %s:%d %s", e.AddrV4, e.Port, e.Info)
mpeer, err := peer.IDB58Decode(e.Info)
if err != nil {
log.Warning("Error parsing peer ID from mdns entry: ", err)
return
}

if mpeer == m.host.ID() {
log.Debug("got our own mdns entry, skipping")
return
}

Expand All @@ -159,7 +172,7 @@ func (m *mdnsService) handleEntry(e *mdns.ServiceEntry) {

m.lk.Lock()
for _, n := range m.notifees {
n.HandlePeerFound(pi)
go n.HandlePeerFound(pi)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like this is the only thing this changeset should change. (plus the logs)

}
m.lk.Unlock()
}
Expand Down
51 changes: 51 additions & 0 deletions p2p/discovery/mdns_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package discovery

import (
"context"
"testing"
"time"

host "github.com/libp2p/go-libp2p/p2p/host"
netutil "github.com/libp2p/go-libp2p/p2p/test/util"

pstore "github.com/ipfs/go-libp2p-peerstore"
)

type DiscoveryNotifee struct {
h host.Host
}

func (n *DiscoveryNotifee) HandlePeerFound(pi pstore.PeerInfo) {
n.h.Connect(context.Background(), pi)
}

func TestMdnsDiscovery(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

a := netutil.GenHostSwarm(t, ctx)
b := netutil.GenHostSwarm(t, ctx)

sa, err := NewMdnsService(ctx, a, time.Second)
if err != nil {
t.Fatal(err)
}

sb, err := NewMdnsService(ctx, b, time.Second)
if err != nil {
t.Fatal(err)
}

_ = sb

n := &DiscoveryNotifee{a}

sa.RegisterNotifee(n)

time.Sleep(time.Second * 2)

err = a.Connect(ctx, pstore.PeerInfo{ID: b.ID()})
if err != nil {
t.Fatal(err)
}
}