Skip to content

Commit

Permalink
rename the holepunch.HolePunchService to holepunch.Service
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Aug 30, 2021
1 parent 2bae307 commit 997a2a0
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 32 deletions.
4 changes: 2 additions & 2 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type BasicHost struct {
network network.Network
mux *msmux.MultistreamMuxer
ids *identify.IDService
hps *holepunch.HolePunchService
hps *holepunch.Service
pings *ping.PingService
natmgr NATManager
maResolver *madns.Resolver
Expand Down Expand Up @@ -216,7 +216,7 @@ func NewHost(ctx context.Context, n network.Network, opts *HostOpts) (*BasicHost
}

if opts.EnableHolePunching {
h.hps, err = holepunch.NewHolePunchService(h, h.ids, opts.HolePunchingOptions...)
h.hps, err = holepunch.NewService(h, h.ids, opts.HolePunchingOptions...)
if err != nil {
return nil, fmt.Errorf("failed to create hole punch service: %w", err)
}
Expand Down
30 changes: 15 additions & 15 deletions p2p/protocol/holepunch/coordination.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ var (
log = logging.Logger("p2p-holepunch")
)

// The HolePunchService is used to make direct connections with a peer via hole-punching.
type HolePunchService struct {
// The Service is used to make direct connections with a peer via hole-punching.
type Service struct {
ctx context.Context
ctxCancel context.CancelFunc

Expand All @@ -58,16 +58,16 @@ type HolePunchService struct {
active map[peer.ID]struct{}
}

type Option func(*HolePunchService) error
type Option func(*Service) error

// NewHolePunchService creates a new service that can be used for hole punching
func NewHolePunchService(h host.Host, ids *identify.IDService, opts ...Option) (*HolePunchService, error) {
// NewService creates a new service that can be used for hole punching
func NewService(h host.Host, ids *identify.IDService, opts ...Option) (*Service, error) {
if ids == nil {
return nil, errors.New("identify service can't be nil")
}

ctx, cancel := context.WithCancel(context.Background())
hs := &HolePunchService{
hs := &Service{
ctx: ctx,
ctxCancel: cancel,
host: h,
Expand All @@ -88,15 +88,15 @@ func NewHolePunchService(h host.Host, ids *identify.IDService, opts ...Option) (
}

// Close closes the Hole Punch Service.
func (hs *HolePunchService) Close() error {
func (hs *Service) Close() error {
hs.ctxCancel()
hs.refCount.Wait()
return nil
}

// initiateHolePunch opens a new hole punching coordination stream,
// exchanges the addresses and measures the RTT.
func (hs *HolePunchService) initiateHolePunch(rp peer.ID) ([]ma.Multiaddr, time.Duration, error) {
func (hs *Service) initiateHolePunch(rp peer.ID) ([]ma.Multiaddr, time.Duration, error) {
hpCtx := network.WithUseTransient(hs.ctx, "hole-punch")
sCtx := network.WithNoDial(hpCtx, "hole-punch")
str, err := hs.host.NewStream(sCtx, rp, Protocol)
Expand Down Expand Up @@ -148,7 +148,7 @@ func (hs *HolePunchService) initiateHolePunch(rp peer.ID) ([]ma.Multiaddr, time.

// attempts to make a direct connection with the remote peer of `relayConn` by co-ordinating a hole punch over
// the given relay connection `relayConn`.
func (hs *HolePunchService) HolePunch(rp peer.ID) error {
func (hs *Service) HolePunch(rp peer.ID) error {
// short-circuit check to see if we already have a direct connection
for _, c := range hs.host.Network().ConnsToPeer(rp) {
if !isRelayAddress(c.RemoteMultiaddr()) {
Expand Down Expand Up @@ -214,12 +214,12 @@ func (hs *HolePunchService) HolePunch(rp peer.ID) error {
return fmt.Errorf("all retries for hole punch with peer %s failed", rp)
}

func (hs *HolePunchService) handlerError(p peer.ID, err error) {
func (hs *Service) handlerError(p peer.ID, err error) {
hs.tracer.ProtocolError(p, err)
log.Warn(err)
}

func (hs *HolePunchService) handleNewStream(s network.Stream) {
func (hs *Service) handleNewStream(s network.Stream) {
log.Infof("got hole punch request from peer %s", s.Conn().RemotePeer().Pretty())
_ = s.SetDeadline(time.Now().Add(StreamTimeout))
rp := s.Conn().RemotePeer()
Expand Down Expand Up @@ -286,7 +286,7 @@ func (hs *HolePunchService) handleNewStream(s network.Stream) {
}
}

func (hs *HolePunchService) holePunchConnect(pi peer.AddrInfo, attempt int) error {
func (hs *Service) holePunchConnect(pi peer.AddrInfo, attempt int) error {
holePunchCtx := network.WithSimultaneousConnect(hs.ctx, "hole-punching")
forceDirectConnCtx := network.WithForceDirectDial(holePunchCtx, "hole-punching")
dialCtx, cancel := context.WithTimeout(forceDirectConnCtx, dialTimeout)
Expand Down Expand Up @@ -330,10 +330,10 @@ func addrsFromBytes(bzs [][]byte) []ma.Multiaddr {
return addrs
}

type netNotifiee HolePunchService
type netNotifiee Service

func (nn *netNotifiee) HolePunchService() *HolePunchService {
return (*HolePunchService)(nn)
func (nn *netNotifiee) HolePunchService() *Service {
return (*Service)(nn)
}

func (nn *netNotifiee) Connected(_ network.Network, v network.Conn) {
Expand Down
54 changes: 41 additions & 13 deletions p2p/protocol/holepunch/coordination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package holepunch_test
import (
"context"
"net"
"sync"
"testing"
"time"

Expand All @@ -22,19 +23,34 @@ import (
)

type mockEventTracer struct {
mutex sync.Mutex
events []*holepunch.Event
}

func (m *mockEventTracer) Trace(evt *holepunch.Event) {
m.mutex.Lock()
m.events = append(m.events, evt)
m.mutex.Unlock()
}

func (m *mockEventTracer) getEvents() []*holepunch.Event {
m.mutex.Lock()
defer m.mutex.Unlock()
events := make([]*holepunch.Event, 0, len(m.events))
for _, ev := range m.events {
events = append(events, ev)
}
return events
}

var _ holepunch.EventTracer = &mockEventTracer{}

func TestNoHolePunchIfDirectConnExists(t *testing.T) {
tr := &mockEventTracer{}
h1, hps := mkHostWithHolePunchSvc(t, holepunch.WithTracer(tr))
defer h1.Close()
h2, _ := mkHostWithHolePunchSvc(t)
defer h2.Close()
require.NoError(t, h1.Connect(context.Background(), peer.AddrInfo{
ID: h2.ID(),
Addrs: h2.Addrs(),
Expand All @@ -48,7 +64,7 @@ func TestNoHolePunchIfDirectConnExists(t *testing.T) {
require.NoError(t, hps.HolePunch(h2.ID()))
require.Equal(t, len(h1.Network().ConnsToPeer(h2.ID())), nc1)
require.Equal(t, len(h2.Network().ConnsToPeer(h1.ID())), nc2)
require.Empty(t, tr.events)
require.Empty(t, tr.getEvents())
}

func TestDirectDialWorks(t *testing.T) {
Expand All @@ -59,7 +75,9 @@ func TestDirectDialWorks(t *testing.T) {

tr := &mockEventTracer{}
h1, h1ps := mkHostWithHolePunchSvc(t, holepunch.WithTracer(tr))
defer h1.Close()
h2, _ := mkHostWithHolePunchSvc(t)
defer h2.Close()
h2.RemoveStreamHandler(holepunch.Protocol)
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.ConnectedAddrTTL)

Expand All @@ -68,8 +86,9 @@ func TestDirectDialWorks(t *testing.T) {
require.NoError(t, h1ps.HolePunch(h2.ID()))
require.GreaterOrEqual(t, len(h1.Network().ConnsToPeer(h2.ID())), 1)
require.GreaterOrEqual(t, len(h2.Network().ConnsToPeer(h1.ID())), 1)
require.Len(t, tr.events, 1)
require.Equal(t, tr.events[0].Type, holepunch.DirectDialEvtT)
events := tr.getEvents()
require.Len(t, events, 1)
require.Equal(t, events[0].Type, holepunch.DirectDialEvtT)
}

func TestEndToEndSimConnect(t *testing.T) {
Expand All @@ -78,8 +97,10 @@ func TestEndToEndSimConnect(t *testing.T) {
manet.Private4 = []*net.IPNet{}
tr := &mockEventTracer{}
h1, _ := mkHostWithHolePunchSvc(t, holepunch.WithTracer(tr))
defer h1.Close()
r := mkRelay(t, context.Background())
h2, _ := mkHostWithStaticAutoRelay(t, context.Background(), r)
defer h2.Close()
manet.Private4 = cpy

// h1 has a relay addr
Expand All @@ -101,10 +122,11 @@ func TestEndToEndSimConnect(t *testing.T) {
ensureDirectConn(t, h1, h2)
// ensure no hole-punching streams are open on either side
ensureNoHolePunchingStream(t, h1, h2)
require.Len(t, tr.events, 3)
require.Equal(t, tr.events[0].Type, holepunch.StartHolePunchEvtT)
require.Equal(t, tr.events[1].Type, holepunch.HolePunchAttemptEvtT)
require.Equal(t, tr.events[2].Type, holepunch.EndHolePunchEvtT)
events := tr.getEvents()
require.Len(t, events, 3)
require.Equal(t, events[0].Type, holepunch.StartHolePunchEvtT)
require.Equal(t, events[1].Type, holepunch.HolePunchAttemptEvtT)
require.Equal(t, events[2].Type, holepunch.EndHolePunchEvtT)
}

func TestFailuresOnInitiator(t *testing.T) {
Expand Down Expand Up @@ -150,11 +172,14 @@ func TestFailuresOnInitiator(t *testing.T) {
// all addrs should be marked as public
cpy := manet.Private4
manet.Private4 = []*net.IPNet{}
defer func() { manet.Private4 = cpy }()
tr := &mockEventTracer{}
h1, h1ps := mkHostWithHolePunchSvc(t, holepunch.WithTracer(tr))
defer h1.Close()
r := mkRelay(t, context.Background())
defer r.Close()
h2, _ := mkHostWithStaticAutoRelay(t, context.Background(), r)
manet.Private4 = cpy
defer h2.Close()

// h1 has a relay addr
// h2 should connect to the relay addr
Expand Down Expand Up @@ -225,7 +250,9 @@ func TestFailuresOnResponder(t *testing.T) {

tr := &mockEventTracer{}
h1, _ := mkHostWithHolePunchSvc(t)
defer h1.Close()
h2, _ := mkHostWithHolePunchSvc(t, holepunch.WithTracer(tr))
defer h2.Close()
connect(t, context.Background(), h1, h2)

s, err := h1.NewStream(context.Background(), h2.ID(), holepunch.Protocol)
Expand All @@ -235,7 +262,8 @@ func TestFailuresOnResponder(t *testing.T) {

getTracerError := func(tr *mockEventTracer) []string {
var errs []string
for _, ev := range tr.events {
events := tr.getEvents()
for _, ev := range events {
if errEv, ok := ev.Evt.(*holepunch.ProtocolErrorEvt); ok {
errs = append(errs, errEv.Error)
}
Expand Down Expand Up @@ -304,7 +332,7 @@ func connect(t *testing.T, ctx context.Context, h1, h2 host.Host) {
require.GreaterOrEqual(t, len(h1.Network().ConnsToPeer(h2.ID())), 1)
}

func mkHostWithStaticAutoRelay(t *testing.T, ctx context.Context, relay host.Host) (host.Host, *holepunch.HolePunchService) {
func mkHostWithStaticAutoRelay(t *testing.T, ctx context.Context, relay host.Host) (host.Host, *holepunch.Service) {
pi := peer.AddrInfo{
ID: relay.ID(),
Addrs: relay.Addrs(),
Expand All @@ -315,7 +343,7 @@ func mkHostWithStaticAutoRelay(t *testing.T, ctx context.Context, relay host.Hos
require.NoError(t, err)
ids, err := identify.NewIDService(h)
require.NoError(t, err)
hps, err := holepunch.NewHolePunchService(h, ids)
hps, err := holepunch.NewService(h, ids)
require.NoError(t, err)

// wait till we have a relay addr
Expand All @@ -337,12 +365,12 @@ func mkRelay(t *testing.T, ctx context.Context) host.Host {
return h
}

func mkHostWithHolePunchSvc(t *testing.T, opts ...holepunch.Option) (host.Host, *holepunch.HolePunchService) {
func mkHostWithHolePunchSvc(t *testing.T, opts ...holepunch.Option) (host.Host, *holepunch.Service) {
h, err := libp2p.New(context.Background())
require.NoError(t, err)
ids, err := identify.NewIDService(h)
require.NoError(t, err)
hps, err := holepunch.NewHolePunchService(h, ids, opts...)
hps, err := holepunch.NewService(h, ids, opts...)
require.NoError(t, err)
return h, hps
}
4 changes: 2 additions & 2 deletions p2p/protocol/holepunch/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
ma "github.com/multiformats/go-multiaddr"
)

// WithTracer is a HolePunchService option that enables hole punching tracing
// WithTracer is a Service option that enables hole punching tracing
func WithTracer(tr EventTracer) Option {
return func(hps *HolePunchService) error {
return func(hps *Service) error {
hps.tracer = &Tracer{tr: tr, self: hps.host.ID()}
return nil
}
Expand Down

0 comments on commit 997a2a0

Please sign in to comment.